Here is the method signature used by HFileOutputFormat : public void write(ImmutableBytesWritable row, KeyValue kv)
Meaning, KeyValue is expected, not Put. On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbi...@gmail.com> wrote: > Hi Team, > > I need some help on writing a scala to bulk load some data into hbase. > *Env:* > hbase 0.94 > spark-1.0.2 > > I am trying below code to just bulk load some data into hbase table “t1”. > > import org.apache.spark._ > import org.apache.spark.rdd.NewHadoopRDD > import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} > import org.apache.hadoop.hbase.client.HBaseAdmin > import org.apache.hadoop.hbase.mapreduce.TableInputFormat > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.hbase.HColumnDescriptor > import org.apache.hadoop.hbase.util.Bytes > import org.apache.hadoop.hbase.client.Put; > import org.apache.hadoop.hbase.client.HTable; > import org.apache.hadoop.hbase.mapred.TableOutputFormat > import org.apache.hadoop.mapred.JobConf > import org.apache.hadoop.hbase.io.ImmutableBytesWritable > import org.apache.hadoop.mapreduce.Job > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat > import org.apache.hadoop.hbase.KeyValue > import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat > > val conf = HBaseConfiguration.create() > val tableName = "t1" > val table = new HTable(conf, tableName) > > conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) > val job = Job.getInstance(conf) > job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) > job.setMapOutputValueClass (classOf[KeyValue]) > HFileOutputFormat.configureIncrementalLoad (job, table) > > val num = sc.parallelize(1 to 10) > val rdd = num.map(x=>{ > val put: Put = new Put(Bytes.toBytes(x)) > put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) > (new ImmutableBytesWritable(Bytes.toBytes(x)), put) > }) > rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", classOf[ImmutableBytesWritable], > classOf[Put], classOf[HFileOutputFormat], conf) > > > However I am allways getting below error: > java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot > be cast to org.apache.hadoop.hbase.KeyValue > at > org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > My questions are: > 1. Do we have a sample code to do bulk load into hbase directly? > Can we use saveAsNewAPIHadoopFile? > > 2. Is there any other way to do this? > For example, firstly write a hfile on hdfs, and then use hbase command to > bulk load? > Any sample code using scala? > > Thanks. > > > > > -- > Thanks, > www.openkb.info > (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) >