Hi all, In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to store the offsets in Kafka in order to achieve restartability of the streaming application. ( Using checkpoints, I already implemented, we will require to change code in production hence checkpoint won't work)
Checking Spark Streaming documentation- Storing offsets on Kafka approach : http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself, which describes : stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } Based on this, I modified the code like following: val kafkaMap:Map[String,Object] = KakfaConfigs val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] (Array("topicName"),kafkaMap)) stream.foreach { rdd => val offsetRangers : Array[OffsetRanger] = rdd.asInstanceOf[HasOffsetRangers].offsetRanges // Filter out the values which have empty values and get the tuple of type // ( topicname, stringValue_read_from_kafka_topic) stream.map(x => ("topicName",x.value)).filter(x=> !x._2.trim.isEmpty).foreachRDD(processRDD _) // Sometime later, after outputs have completed. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } def processRDD(rdd:RDD[(String,String)]) { // Process futher to hdfs } Now, When I try to start Streaming application, it does not start and looking at the logs, here is what we see : java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223) at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65) Can anyone suggest, or help to understand what are we missing here? Regards, Arpan