HI,

When I create stream with KafkaUtils.createDirectStream I can explicitly define 
the position "largest" or "smallest" - where to read topic from.
What if I have previous checkpoints( in HDFS for example) with offsets, and I 
want to start reading from the last checkpoint?
In source code of KafkaUtils.createDirectStream I see the following

 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
 
     (for {
       topicPartitions <- kc.getPartitions(topics).right
       leaderOffsets <- (if (reset == Some("smallest")) {
         kc.getEarliestLeaderOffsets(topicPartitions)
       } else {
         kc.getLatestLeaderOffsets(topicPartitions)
       }).right
...

So it turns out that, I have no options to start reading from checkpoints(and 
offsets)?
Am I right?
How can I force Spark to start reading from saved offesets(in checkpoints)? Is 
it possible at all or I need to store offsets in external datastore?

Alexey Ponkin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-KafkaUtils-createDirectStream-how-to-start-streming-from-checkpoints-tp25461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to