I have upgrading to Spark 2.0 and am experimenting with using Kafka 0.10.0. I have a stream that I extract the data and would like to update the Kafka offsets as each partition is handled. With Spark 1.6 or Spark 2.0 and Kafka 0.8.2 I was able to update the offsets, but now there seems no way to do so. Here is an example
val stream = getStream stream.forEachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { events => val partId = TaskContext.get.partitionId val offsets = offsetRanges(partId) // Do something with the data // Update the offsets for the partition so at most, the partition's data would be duplicated } } With the new stream, I could call `commitAsync` with the offsets, but the drawback here is that it would only update the offsets after the entire RDD is handled. This can be a real issue for near "exactly once". With the new logic, each partition has a Kafka consumer associated with each partition, however, there is no access to it. I have looked at the CachedKafkaConsumer classes and there is no way at the cache as well so that I could call a commit on the offsets. Beyond that I have tried to use the new Kafka 0.10 APIs, but always run into errors as it requires one to subscribe to the topic and get assigned partitions. I only want to update the offsets in Kafka. Any ideas would be helpful of how I might work with the Kafka API to set the offsets or get Spark to add logic to allow the commitment of offsets on a partition basis. Thanks, Ivan -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Committing-Kafka-offsets-when-using-DirectKafkaInputDStream-tp18840.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org