Hi all,

In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to store
the offsets in Kafka in order to achieve restartability of the streaming
application. ( Using checkpoints, I already implemented, we will require to
change code in production hence checkpoint won't work)

Checking Spark Streaming documentation- Storing offsets on Kafka approach :

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself,
which describes :

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


Based on this, I modified the code like following:

val kafkaMap:Map[String,Object] = KakfaConfigs

val stream:InputDStream[ConsumerRecord[String,String]] =
KafkaUtil.createDirectStream(ssc, PreferConsistent,
Subscribe[String,String] (Array("topicName"),kafkaMap))

stream.foreach { rdd =>
    val offsetRangers : Array[OffsetRanger] =
rdd.asInstanceOf[HasOffsetRangers].offsetRanges

    // Filter out the values which have empty values and get the tuple of type
        // ( topicname, stringValue_read_from_kafka_topic)
    stream.map(x => ("topicName",x.value)).filter(x=>
!x._2.trim.isEmpty).foreachRDD(processRDD _)

    // Sometime later, after outputs have completed.
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


def processRDD(rdd:RDD[(String,String)]) {
 // Process futher to hdfs
}

Now, When I try to start Streaming application, it does not start and
looking at the logs, here is what we see :

java.lang.IllegalStateException: Adding new inputs, transformations,
and output operations after starting a context is not supported
    at 
org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)


Can anyone suggest, or help to understand what are we missing here?


Regards,
Arpan

Reply via email to