Thanks Sun.
My understanding is , savaAsNewHadoopFile is to save as Hfile on hdfs.

Is it doable to use saveAsNewAPIHadoopDataset to directly loading to hbase?
If so, is there any sample code for that?

Thanks!

On Tue, Jan 27, 2015 at 6:07 PM, fightf...@163.com <fightf...@163.com>
wrote:

> Hi, Jim
> Your generated rdd should be the type of RDD[ImmutableBytesWritable,
> KeyValue], while your current type goes to RDD[ImmutableBytesWritable, Put].
> You can go like this and the result should be type of
> RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile
>      val result = num.flatMap ( v=> {
>           keyValueBuilder(v).map(v => (v,1))
>        }).map(v => ( new ImmutableBytesWritable(v._1.getBuffer(),
> v._1.getRowOffset(), v._1.getRowLength()),v._1))
>
> where keyValueBuider would be defined as RDD[T] => RDD[List[KeyValue]],
> for example, you can go:
>            val keyValueBuilder = (data: (Int, Int))  =>{
>                val rowkeyBytes = Bytes.toBytes(data._1)
>                val colfam = Bytes.toBytes("cf")
>                val qual = Bytes.toBytes("c1")
>                val value = Bytes.toBytes("val_xxx")
>
>                val kv = new KeyValue(rowkeyBytes,colfam,qual,value)
>                List(kv)
>           }
>
>
> Thanks,
> Sun
> ------------------------------
> fightf...@163.com
>
>
> *From:* Jim Green <openkbi...@gmail.com>
> *Date:* 2015-01-28 04:44
> *To:* Ted Yu <yuzhih...@gmail.com>
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
> I used below code, and it still failed with the same error.
> Anyone has experience on bulk loading using scala?
> Thanks.
>
> import org.apache.spark._
> import org.apache.spark.rdd.NewHadoopRDD
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HColumnDescriptor
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.mapred.TableOutputFormat
> import org.apache.hadoop.mapred.JobConf
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
> import org.apache.hadoop.mapreduce.Job
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
> import org.apache.hadoop.hbase.KeyValue
> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>
> val conf = HBaseConfiguration.create()
> val tableName = "t1"
> val table = new HTable(conf, tableName)
>
> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
> val job = Job.getInstance(conf)
> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
> job.setMapOutputValueClass (classOf[KeyValue])
> HFileOutputFormat.configureIncrementalLoad (job, table)
>
> val num = sc.parallelize(1 to 10)
> val rdd = num.map(x=>{
>     val put: Put = new Put(Bytes.toBytes(x))
>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
> })
> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13", classOf[ImmutableBytesWritable],
> classOf[KeyValue], classOf[HFileOutputFormat], conf)
>
>
>
> On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <openkbi...@gmail.com> wrote:
>
>> Thanks Ted. Could you give me a simple example to load one row data in
>> hbase? How should I generate the KeyValue?
>> I tried multiple times, and still can not figure it out.
>>
>> On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Here is the method signature used by HFileOutputFormat :
>>>       public void write(ImmutableBytesWritable row, KeyValue kv)
>>>
>>> Meaning, KeyValue is expected, not Put.
>>>
>>> On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I need some help on writing a scala to bulk load some data into hbase.
>>>> *Env:*
>>>> hbase 0.94
>>>> spark-1.0.2
>>>>
>>>> I am trying below code to just bulk load some data into hbase table
>>>> “t1”.
>>>>
>>>> import org.apache.spark._
>>>> import org.apache.spark.rdd.NewHadoopRDD
>>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
>>>> import org.apache.hadoop.hbase.client.HBaseAdmin
>>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>>> import org.apache.hadoop.fs.Path;
>>>> import org.apache.hadoop.hbase.HColumnDescriptor
>>>> import org.apache.hadoop.hbase.util.Bytes
>>>> import org.apache.hadoop.hbase.client.Put;
>>>> import org.apache.hadoop.hbase.client.HTable;
>>>> import org.apache.hadoop.hbase.mapred.TableOutputFormat
>>>> import org.apache.hadoop.mapred.JobConf
>>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>>>> import org.apache.hadoop.mapreduce.Job
>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
>>>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
>>>> import org.apache.hadoop.hbase.KeyValue
>>>> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>>>>
>>>> val conf = HBaseConfiguration.create()
>>>> val tableName = "t1"
>>>> val table = new HTable(conf, tableName)
>>>>
>>>> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
>>>> val job = Job.getInstance(conf)
>>>> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
>>>> job.setMapOutputValueClass (classOf[KeyValue])
>>>> HFileOutputFormat.configureIncrementalLoad (job, table)
>>>>
>>>> val num = sc.parallelize(1 to 10)
>>>> val rdd = num.map(x=>{
>>>>     val put: Put = new Put(Bytes.toBytes(x))
>>>>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>>>>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
>>>> })
>>>> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8",
>>>> classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat],
>>>> conf)
>>>>
>>>>
>>>> However I am allways getting below error:
>>>> java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put
>>>> cannot be cast to org.apache.hadoop.hbase.KeyValue
>>>> at
>>>> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
>>>> at
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
>>>> at
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:51)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> My questions are:
>>>> 1. Do we have a sample code to do bulk load into hbase directly?
>>>> Can we use saveAsNewAPIHadoopFile?
>>>>
>>>> 2. Is there any other way to do this?
>>>> For example, firstly write a hfile on hdfs, and then use hbase command
>>>> to bulk load?
>>>> Any sample code using scala?
>>>>
>>>> Thanks.
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> www.openkb.info
>>>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> www.openkb.info
>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>
>
>
>
> --
> Thanks,
> www.openkb.info
> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>
>


-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)

Reply via email to