Hi, Jim Your generated rdd should be the type of RDD[ImmutableBytesWritable, KeyValue], while your current type goes to RDD[ImmutableBytesWritable, Put]. You can go like this and the result should be type of RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile val result = num.flatMap ( v=> { keyValueBuilder(v).map(v => (v,1)) }).map(v => ( new ImmutableBytesWritable(v._1.getBuffer(), v._1.getRowOffset(), v._1.getRowLength()),v._1))
where keyValueBuider would be defined as RDD[T] => RDD[List[KeyValue]], for example, you can go: val keyValueBuilder = (data: (Int, Int)) =>{ val rowkeyBytes = Bytes.toBytes(data._1) val colfam = Bytes.toBytes("cf") val qual = Bytes.toBytes("c1") val value = Bytes.toBytes("val_xxx") val kv = new KeyValue(rowkeyBytes,colfam,qual,value) List(kv) } Thanks, Sun fightf...@163.com From: Jim Green Date: 2015-01-28 04:44 To: Ted Yu CC: user Subject: Re: Bulk loading into hbase using saveAsNewAPIHadoopFile I used below code, and it still failed with the same error. Anyone has experience on bulk loading using scala? Thanks. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = "t1" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val put: Put = new Put(Bytes.toBytes(x)) put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <openkbi...@gmail.com> wrote: Thanks Ted. Could you give me a simple example to load one row data in hbase? How should I generate the KeyValue? I tried multiple times, and still can not figure it out. On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: Here is the method signature used by HFileOutputFormat : public void write(ImmutableBytesWritable row, KeyValue kv) Meaning, KeyValue is expected, not Put. On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbi...@gmail.com> wrote: Hi Team, I need some help on writing a scala to bulk load some data into hbase. Env: hbase 0.94 spark-1.0.2 I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = "t1" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val put: Put = new Put(Bytes.toBytes(x)) put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) However I am allways getting below error: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) My questions are: 1. Do we have a sample code to do bulk load into hbase directly? Can we use saveAsNewAPIHadoopFile? 2. Is there any other way to do this? For example, firstly write a hfile on hdfs, and then use hbase command to bulk load? Any sample code using scala? Thanks. -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)