Hi, In the following example using mapWithState, I set checkpoint interval to 1 minute. From the log, Spark stills write to the checkpoint directory every second. Would be appreciated if someone can point out what I have done wrong. object MapWithStateDemo { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: MapWithStateDemo <hostname> <port>") System.exit(1) }
val sparkConf = new SparkConf().setAppName("MapWithStateDemo") .setIfMissing("spark.master","local[*]") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) // Initial state RDD for mapWithState operation val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using mapWithState // This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] = wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD)) stateDstream.checkpoint(Minutes(1L)) stateDstream.print() val targetDir = new File(getClass.getResource("/").toURI).getParentFile.getParentFile val checkpointDir = targetDir + "/checkpoint" ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() } } Thanks in advance for any assistance ! Shing