I've been reading documentation on accessing offsetRanges and updating ZK yourself when using DirectKafkaInputDStream (from createDirectStream), along with the code changes in this PR: https://github.com/apache/spark/pull/4805.
I'm planning on adding a listener to update ZK (for monitoring purposes) when batch completes. What would be a consistent manner to index the offsets for a given batch? In the PR above, it seems like batchTime was used, but is there a way to create this batchTime -> offsets in the streaming app itself? Something like: var currOffsetRanges = Array[OffsetRange]() directKafkaStream.transform { rdd => currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => ... /*DO STUFF*/ } offsetMap += ((validTime, currOffsetRanges)) Then in the listener (onBatchComplete), retrieve corresponding offsetRanges associated with the completed batchTime and update ZK accordingly. I'm unsure how to define validTime above. Any help/advice would be appreciated. Thanks!