Hello,

I'm planning on adding a listener to update Zookeeper (for monitoring
purposes) when batch completes. What would be a consistent manner to index
the offsets for a given batch? In the PR above, it seems like batchTime was
used, but is there a way to create this batchTime -> offsets in the
streaming app itself?

Something like:

var currOffsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
   currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.foreachRDD { rdd =>
   ... /*do stuff with shuffling involved, then update DynamoDB*/
}
offsetMap += ((validTime, currOffsetRanges)) 

Then in the listener (onBatchComplete), retrieve corresponding offsetRanges
associated with the completed batchTime and update ZK accordingly.

I'm unsure how to define validTime above. Any help/advice/words of warning
would be appreciated. 

Thanks!


Side note: I also considered the partition/batch updating examples in
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/,
but since shuffling occurs and DynamoDB/ZK aren't really the same
"database", neither would work in this case. If I'm missing something about
ZK here, please let me know too.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to