Hello, I'm planning on adding a listener to update Zookeeper (for monitoring purposes) when batch completes. What would be a consistent manner to index the offsets for a given batch? In the PR above, it seems like batchTime was used, but is there a way to create this batchTime -> offsets in the streaming app itself?
Something like: var currOffsetRanges = Array[OffsetRange]() directKafkaStream.transform { rdd => currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => ... /*do stuff with shuffling involved, then update DynamoDB*/ } offsetMap += ((validTime, currOffsetRanges)) Then in the listener (onBatchComplete), retrieve corresponding offsetRanges associated with the completed batchTime and update ZK accordingly. I'm unsure how to define validTime above. Any help/advice/words of warning would be appreciated. Thanks! Side note: I also considered the partition/batch updating examples in https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/, but since shuffling occurs and DynamoDB/ZK aren't really the same "database", neither would work in this case. If I'm missing something about ZK here, please let me know too. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org