Here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
On Sat, 27 Feb 2016, 20:42 Sebastian Piu, <sebastian....@gmail.com> wrote: > You need to create the streaming context using an existing checkpoint for > it to work > > See sample here > > On Sat, 27 Feb 2016, 20:28 Vinti Maheshwari, <vinti.u...@gmail.com> wrote: > >> Hi All, >> >> I wrote spark streaming program with stateful transformation. >> It seems like my spark streaming application is doing computation >> correctly with check pointing. >> But i terminate my program and i start it again, it's not reading the >> previous checkpointing data and staring from the beginning. Is it the >> expected behaviour? >> >> Do i need to change anything in my program so that it will remember the >> previous data and start computation from there? >> >> Thanks in advance. >> >> For reference my program: >> >> >> def main(args: Array[String]): Unit = { >> val conf = new SparkConf().setAppName("HBaseStream") >> val sc = new SparkContext(conf) >> val ssc = new StreamingContext(sc, Seconds(5)) >> val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999) >> >> ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir") >> inputStream.print(1) >> val parsedStream = inputStream >> .map(line => { >> val splitLines = line.split(",") >> (splitLines(1), splitLines.slice(2, >> splitLines.length).map((_.trim.toLong))) >> }) >> import breeze.linalg.{DenseVector => BDV} >> import scala.util.Try >> >> val state: DStream[(String, Array[Long])] = >> parsedStream.updateStateByKey( >> (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { >> prev.map(_ +: current).orElse(Some(current)) >> .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) >> }) >> state.checkpoint(Duration(10000)) >> state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) >> >> // Start the computation >> ssc.start() >> // Wait for the computation to terminate >> ssc.awaitTermination() >> >> } >> } >> >> >> Regards, >> >> ~Vinti >> >>