Yes, the workaround is the same that has been suggested in the JIRA for
accumulator and broadcast variables. Basically make a singleton object
which lazily initializes the HBaseContext. Because of singleton, it wont
get serialized through checkpoint. After recovering, it will be
reinitialized lazily. This is the exact same approach I did for `
SQLContext.getOrCreate()
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L1259>`.
Take a look at the code.

On Tue, Oct 27, 2015 at 11:19 PM, Amit Hora <hora.a...@gmail.com> wrote:

> Thanks for sharing the link.Yes I understand that accumulators and
> broadcast variables state are not recovered from checkpoint but is there
> any way by which I can say that the HBaseContext in this context should nt
> be recovered from checkpoint rather must be reinitialized
> ------------------------------
> From: Adrian Tanase <atan...@adobe.com>
> Sent: ‎27-‎10-‎2015 18:08
> To: Amit Singh Hora <hora.a...@gmail.com>; user@spark.apache.org
> Subject: Re: SPARKONHBase checkpointing issue
>
> Does this help?
>
> https://issues.apache.org/jira/browse/SPARK-5206
>
>
>
> On 10/27/15, 1:53 PM, "Amit Singh Hora" <hora.a...@gmail.com> wrote:
>
> >Hi all ,
> >
> >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
> >below code
> >object test {
> >
> >    def main(args: Array[String]): Unit = {
> >
> >
> >
> >   val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> >    val checkpointDirectory=conf.getString("spark.checkpointDir")
> >    val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
> >      functionToCreateContext(checkpointDirectory)
> >    })
> >
> >
> >    ssc.start()
> >    ssc.awaitTermination()
> >
> >         }
> >
> >    def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
> >      println("always gets created")
> >           val hconf = HBaseConfiguration.create();
> >    val timeout= conf.getString("hbase.zookeepertimeout")
> >    val master=conf.getString("hbase.hbase_master")
> >    val zk=conf.getString("hbase.hbase_zkquorum")
> >    val zkport=conf.getString("hbase.hbase_zk_port")
> >
> >      hconf.set("zookeeper.session.timeout",timeout);
> >    hconf.set("hbase.client.retries.number", Integer.toString(1));
> >    hconf.set("zookeeper.recovery.retry", Integer.toString(1));
> >    hconf.set("hbase.master", master);
> >    hconf.set("hbase.zookeeper.quorum",zk);
> >    hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
> >    hconf.set("hbase.zookeeper.property.clientPort",zkport );
> >
> >
> >    val hbaseContext = new HBaseContext(sc, hconf);
> >    return hbaseContext
> >    }
> >  def functionToCreateContext(checkpointDirectory: String):
> StreamingContext
> >= {
> >    println("creating for frst time")
> >    val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> >    val brokerlist = conf.getString("kafka.broker")
> >    val topic = conf.getString("kafka.topic")
> >
> >    val Array(brokers, topics) = Array(brokerlist, topic)
> >
> >
> >    val sparkConf = new
> SparkConf().setAppName("HBaseBulkPutTimestampExample
> >" )
> >    sparkConf.set("spark.cleaner.ttl", "2");
> >    sparkConf.setMaster("local[2]")
> >
> >
> >     val topicsSet = topic.split(",").toSet
> >        val batchduration = conf.getString("spark.batchduration").toInt
> >    val ssc: StreamingContext = new StreamingContext(sparkConf,
> >Seconds(batchduration))
> >      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
> >     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> >brokerlist, "auto.offset.reset" -> "smallest")
> >    val messages = KafkaUtils.createDirectStream[String, String,
> >StringDecoder, StringDecoder](
> >      ssc, kafkaParams, topicsSet)
> >    val lines=messages.map(_._2)
> >
> >
> >
> >    getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
> >      "ecs_test",
> >      (putRecord) => {
> >        if (putRecord.length() > 0) {
> >          var maprecord = new HashMap[String, String];
> >                  val mapper = new ObjectMapper();
> >
> >                  //convert JSON string to Map
> >                  maprecord = mapper.readValue(putRecord,
> >                    new TypeReference[HashMap[String, String]]() {});
> >
> >                  var ts: Long = maprecord.get("ts").toLong
> >
> >                   var tweetID:Long= maprecord.get("id").toLong
> >              val key=ts+"_"+tweetID;
> >
> >                  val put = new Put(Bytes.toBytes(key))
> >                  maprecord.foreach(kv => {
> >
> >
> >put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
> >
> >
> >              })
> >
> >
> >          put
> >        } else {
> >          null
> >        }
> >      },
> >      false);
> >
> >    ssc
> >
> >  }
> >}
> >
> >i am not able to retrieve from checkpoint after restart ,always get
> >Unable to getConfig from broadcast
> >
> >after debugging more i can see that the method for creating the
> HbaseContext
> >actually broadcasts the configuration ,context object passed
> >
> >as a solution i just want to recreate the hbase context in every condition
> >weather the checkpoint exists or not
> >
> >
> >
> >--
> >View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
> >Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> >---------------------------------------------------------------------
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
>

Reply via email to