I have upgrading to Spark 2.0 and am experimenting with using Kafka 0.10.0. I
have a stream that I extract the data and would like to update the Kafka
offsets as each partition is handled. With Spark 1.6 or Spark 2.0 and Kafka
0.8.2 I was able to update the offsets, but now there seems no way to do so.
Here is an example

val stream = getStream

stream.forEachRDD { rdd =>
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.foreachPartition { events =>
        val partId = TaskContext.get.partitionId
        val offsets = offsetRanges(partId)

        // Do something with the data

        // Update the offsets for the partition so at most, the partition's
data would be duplicated
    }
}

With the new stream, I could call `commitAsync` with the offsets, but the
drawback here is that it would only update the offsets after the entire RDD
is handled. This can be a real issue for near "exactly once".

With the new logic, each partition has a Kafka consumer associated with each
partition, however, there is no access to it. I have looked at the
CachedKafkaConsumer classes and there is no way at the cache as well so that
I could call a commit on the offsets.

Beyond that I have tried to use the new Kafka 0.10 APIs, but always run into
errors as it requires one to subscribe to the topic and get assigned
partitions. I only want to update the offsets in Kafka. 

Any ideas would be helpful of how I might work with the Kafka API to set the
offsets or get Spark to add logic to allow the commitment of offsets on a
partition basis.

Thanks,

Ivan



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Committing-Kafka-offsets-when-using-DirectKafkaInputDStream-tp18840.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to