Thanks much Amit, Sebastian. It worked. Regards, ~Vinti
On Sat, Feb 27, 2016 at 12:44 PM, Amit Assudani <aassud...@impetus.com> wrote: > Your context is not being created using checkpoints, use get or create, > > From: Vinti Maheshwari <vinti.u...@gmail.com> > Date: Saturday, February 27, 2016 at 3:28 PM > To: user <user@spark.apache.org> > Subject: Spark streaming not remembering previous state > > Hi All, > > I wrote spark streaming program with stateful transformation. > It seems like my spark streaming application is doing computation > correctly with check pointing. > But i terminate my program and i start it again, it's not reading the > previous checkpointing data and staring from the beginning. Is it the > expected behaviour? > > Do i need to change anything in my program so that it will remember the > previous data and start computation from there? > > Thanks in advance. > > For reference my program: > > > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName("HBaseStream") > val sc = new SparkContext(conf) > val ssc = new StreamingContext(sc, Seconds(5)) > val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999) > > ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir") > inputStream.print(1) > val parsedStream = inputStream > .map(line => { > val splitLines = line.split(",") > (splitLines(1), splitLines.slice(2, > splitLines.length).map((_.trim.toLong))) > }) > import breeze.linalg.{DenseVector => BDV} > import scala.util.Try > > val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( > (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { > prev.map(_ +: current).orElse(Some(current)) > .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) > }) > state.checkpoint(Duration(10000)) > state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) > > // Start the computation > ssc.start() > // Wait for the computation to terminate > ssc.awaitTermination() > > } > } > > > Regards, > > ~Vinti > > > ------------------------------ > > > > > > > NOTE: This message may contain information that is confidential, > proprietary, privileged or otherwise protected by law. The message is > intended solely for the named addressee. If received in error, please > destroy and notify the sender. Any use of this email is prohibited when > received in error. Impetus does not represent, warrant and/or guarantee, > that the integrity of this communication has been maintained nor that the > communication is free of errors, virus, interception or interference. >