private HTable table;
You should declare table variable within apply() method.
BTW which hbase release are you using ?
I see you implement caching yourself. You can make use of the following
HTable method:
public void setWriteBufferSize(long writeBufferSize) throws IOException {
Cheers
On Sun, Jun 21, 2015 at 11:16 PM, Nishant Patel <[email protected]>
wrote:
> Hi,
>
> Please find code as below.
>
> dataFrame
> .foreachPartition(new
> AbstractFunction1<scala.collection.Iterator<Row>, BoxedUnit>() {
>
> private HTable table;
>
> private char ROWKEY_SEPERATOR = '\u0000';
>
> public BoxedUnit apply(scala.collection.Iterator<Row>
> rows) {
>
> Configuration config = HBaseConfiguration.create();
>
> config.set(
> "hbase.zookeeper.quorum",
> "????");
> config.set("hbase.zookeeper.property.clientPort",
> "???");
> config.set("zookeeper.znode.parent", "????");
>
> try {
> table = new HTable(config, "table_name");
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
>
> List<Put> puts = new ArrayList<Put>();
> try {
> while (rows.hasNext()) {
> Row row = rows.next();
> Map<String, Object> map = new
> HashMap<String, Object>();
> String[] fieldNames =
> row.schema().fieldNames();
> for (int i = 0; i < fieldNames.length;
> i++) {
> map.put(fieldNames[i].toUpperCase(),
> row.get(i));
> }
> puts.add(mapToPut(map));
> if (puts.size() >= 500) {
> table.put(puts);
> puts.clear();
> }
>
> }
> table.put(puts);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return BoxedUnit.UNIT;
> }
>
> private Put mapToPut(Map<String, Object> map) throws IOException {
> try {
> Put put = new Put(getRowKey(map));
> String value = null;
> for (String key : map.keySet()) {
> value = (map.get(key) == null ? "" :
> map.get(key).toString());
> put.add(Bytes.toBytes("0"),
> Bytes.toBytes(key),
> Bytes.toBytes(value));
> }
> return put;
> } catch (Exception e) {
> e.printStackTrace();
> throw e;
> }
> }
>
> private byte[] getRowKey(Map<String, Object> map) {
>
> StringBuilder builder = new StringBuilder();
> return Bytes.toBytes(builder.toString());
> }
>
> });
>
> Regards,
> Nishant
>
> On Mon, Jun 22, 2015 at 11:08 AM, Ted Yu <[email protected]> wrote:
>
>> Can you show us the code for loading Hive into hbase ?
>>
>> There shouldn't be 'return' statement in that code.
>>
>> Cheers
>>
>>
>>
>> On Jun 20, 2015, at 10:10 PM, Nishant Patel <[email protected]>
>> wrote:
>>
>> Hi,
>>
>> I am loading data from Hive table to Hbase after doing some manipulation.
>>
>> I am getting error as 'Task not Serializable'.
>>
>> My code is as below.
>>
>> public class HiveToHbaseLoader implements Serializable {
>>
>> public static void main(String[] args) throws Exception {
>>
>> String hbaseTableName = args[0];
>> String hiveQuery = args[1];
>>
>> SparkConf conf = new SparkConf().setAppName("Hive to Hbase
>> Loader")
>> .setMaster("????");
>> JavaSparkContext sc = new JavaSparkContext(conf);
>>
>> HiveContext hiveContext = new HiveContext(sc.sc());
>>
>> hiveContext.setConf("hive.metastore.uris",
>> "?????");
>>
>> DataFrame dataFrame = hiveContext.sql(hiveQuery);
>>
>> dataFrame
>> .foreachPartition(new
>> AbstractFunction1<scala.collection.Iterator<Row>, BoxedUnit>() {
>>
>> //Logic to load row from hive to Hbase.
>>
>> }
>> }}
>>
>>
>> Getting error as below.
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
>> at
>> org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:875)
>> at com.philips.bda.HiveToHbaseLoader.main(HiveToHbaseLoader.java:46)
>> Caused by: java.io.NotSerializableException:
>> com.philips.bda.HiveToHbaseLoader$1
>> Serialization stack:
>> - object not serializable (class:
>> com.philips.bda.HiveToHbaseLoader$1, value: <function1>)
>> at
>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
>> at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>> ... 5 more
>>
>>
>> --
>> Regards,
>> Nishant
>>
>>
>
>
> --
> Regards,
> Nishant Patel
>
>