Can you show the complete stack trace ? Subclass of Mutation is expected. Put is a subclass.
Have you tried replacing BoxedUnit with Put in your code ? Cheers On Fri, Oct 16, 2015 at 6:02 AM, Amit Singh Hora <hora.a...@gmail.com> wrote: > Hi All, > > I am using below code to stream data from kafka to hbase ,everything works > fine until i restart the job so that it can restore the state from > checkpoint directory ,but while trying to restore the state it give me > below > error > > ge 0.0 (TID 0, localhost): java.lang.ClassCastException: > scala.runtime.BoxedUnit cannot be cast to > org.apache.hadoop.hbase.client.Mutation > > please find below code > > tweetsRDD.foreachRDD(rdd=>{ > val hconf = HBaseConfiguration.create(); > hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename) > hconf.set("zookeeper.session.timeout", > conf.getString("hbase.zookeepertimeout")); > hconf.set("hbase.client.retries.number", Integer.toString(1)); > hconf.set("zookeeper.recovery.retry", Integer.toString(1)); > hconf.set("hbase.master", conf.getString("hbase.hbase_master")); > > hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum")); > hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); > hconf.set("hbase.zookeeper.property.clientPort", > conf.getString("hbase.hbase_zk_port")); > hconf.setClass("mapreduce.outputformat.class", > classOf[TableOutputFormat[String]], classOf[OutputFormat[String, > BoxedUnit]]) > > rdd.map ( record =>(new ImmutableBytesWritable,{ > > > var maprecord = new HashMap[String, String]; > val mapper = new ObjectMapper(); > > //convert JSON string to Map > > maprecord = mapper.readValue(record.toString(), > new TypeReference[HashMap[String, String]]() {}); > > > var ts:Long= maprecord.get("ts").toLong > var tweetID:Long= maprecord.get("id").toLong > val key=ts+"_"+tweetID; > val put=new Put(Bytes.toBytes(key)) > maprecord.foreach(kv => { > // println(kv._1+" - "+kv._2) > > > put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) > > > } > ) > put > > } > ) > ).saveAsNewAPIHadoopDataset(hconf) > > }) > > > > help me out in solving this as it is urgent for me > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >