The Kafka commit api isn't transactional, you aren't going to get exactly once behavior out of it even if you were committing offsets on a per-partition basis. This doesn't really have anything to do with Spark; the old code you posted was already inherently broken.
Make your outputs idempotent and use commitAsync. Or store offsets transactionally in your own data store. On Fri, Sep 2, 2016 at 5:50 PM, vonnagy <i...@vadio.com> wrote: > I have upgrading to Spark 2.0 and am experimenting with using Kafka 0.10.0. I > have a stream that I extract the data and would like to update the Kafka > offsets as each partition is handled. With Spark 1.6 or Spark 2.0 and Kafka > 0.8.2 I was able to update the offsets, but now there seems no way to do so. > Here is an example > > val stream = getStream > > stream.forEachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > > rdd.foreachPartition { events => > val partId = TaskContext.get.partitionId > val offsets = offsetRanges(partId) > > // Do something with the data > > // Update the offsets for the partition so at most, the partition's > data would be duplicated > } > } > > With the new stream, I could call `commitAsync` with the offsets, but the > drawback here is that it would only update the offsets after the entire RDD > is handled. This can be a real issue for near "exactly once". > > With the new logic, each partition has a Kafka consumer associated with each > partition, however, there is no access to it. I have looked at the > CachedKafkaConsumer classes and there is no way at the cache as well so that > I could call a commit on the offsets. > > Beyond that I have tried to use the new Kafka 0.10 APIs, but always run into > errors as it requires one to subscribe to the topic and get assigned > partitions. I only want to update the offsets in Kafka. > > Any ideas would be helpful of how I might work with the Kafka API to set the > offsets or get Spark to add logic to allow the commitment of offsets on a > partition basis. > > Thanks, > > Ivan > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/Committing-Kafka-offsets-when-using-DirectKafkaInputDStream-tp18840.html > Sent from the Apache Spark Developers List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org