Hi, If you wish to read from checkpoints, you need to use StreamingContext.getOrCreate(checkpointDir, functionToCreateContext) to create the streaming context that you pass in to KafkaUtils.createDirectStream(...). You may refer to http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing for an example.
HTH, Deng On Tue, Nov 24, 2015 at 5:46 PM, ponkin <alexey.pon...@ya.ru> wrote: > HI, > > When I create stream with KafkaUtils.createDirectStream I can explicitly > define the position "largest" or "smallest" - where to read topic from. > What if I have previous checkpoints( in HDFS for example) with offsets, > and I want to start reading from the last checkpoint? > In source code of KafkaUtils.createDirectStream I see the following > > val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) > > (for { > topicPartitions <- kc.getPartitions(topics).right > leaderOffsets <- (if (reset == Some("smallest")) { > kc.getEarliestLeaderOffsets(topicPartitions) > } else { > kc.getLatestLeaderOffsets(topicPartitions) > }).right > ... > > So it turns out that, I have no options to start reading from > checkpoints(and offsets)? > Am I right? > How can I force Spark to start reading from saved offesets(in > checkpoints)? Is it possible at all or I need to store offsets in external > datastore? > > Alexey Ponkin > > ------------------------------ > View this message in context: [streaming] KafkaUtils.createDirectStream - > how to start streming from checkpoints? > <http://apache-spark-user-list.1001560.n3.nabble.com/streaming-KafkaUtils-createDirectStream-how-to-start-streming-from-checkpoints-tp25461.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >