Here is the method signature used by HFileOutputFormat :
      public void write(ImmutableBytesWritable row, KeyValue kv)

Meaning, KeyValue is expected, not Put.

On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbi...@gmail.com> wrote:

> Hi Team,
>
> I need some help on writing a scala to bulk load some data into hbase.
> *Env:*
> hbase 0.94
> spark-1.0.2
>
> I am trying below code to just bulk load some data into hbase table “t1”.
>
> import org.apache.spark._
> import org.apache.spark.rdd.NewHadoopRDD
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HColumnDescriptor
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.mapred.TableOutputFormat
> import org.apache.hadoop.mapred.JobConf
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
> import org.apache.hadoop.mapreduce.Job
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
> import org.apache.hadoop.hbase.KeyValue
> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>
> val conf = HBaseConfiguration.create()
> val tableName = "t1"
> val table = new HTable(conf, tableName)
>
> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
> val job = Job.getInstance(conf)
> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
> job.setMapOutputValueClass (classOf[KeyValue])
> HFileOutputFormat.configureIncrementalLoad (job, table)
>
> val num = sc.parallelize(1 to 10)
> val rdd = num.map(x=>{
>     val put: Put = new Put(Bytes.toBytes(x))
>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
> })
> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", classOf[ImmutableBytesWritable],
> classOf[Put], classOf[HFileOutputFormat], conf)
>
>
> However I am allways getting below error:
> java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot
> be cast to org.apache.hadoop.hbase.KeyValue
> at
> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> My questions are:
> 1. Do we have a sample code to do bulk load into hbase directly?
> Can we use saveAsNewAPIHadoopFile?
>
> 2. Is there any other way to do this?
> For example, firstly write a hfile on hdfs, and then use hbase command to
> bulk load?
> Any sample code using scala?
>
> Thanks.
>
>
>
>
> --
> Thanks,
> www.openkb.info
> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>

Reply via email to