Hi,

If you wish to read from checkpoints, you need to use
StreamingContext.getOrCreate(checkpointDir, functionToCreateContext) to
create the streaming context that you pass in to
KafkaUtils.createDirectStream(...). You may refer to
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
for an example.

HTH,
Deng

On Tue, Nov 24, 2015 at 5:46 PM, ponkin <alexey.pon...@ya.ru> wrote:

> HI,
>
> When I create stream with KafkaUtils.createDirectStream I can explicitly
> define the position "largest" or "smallest" - where to read topic from.
> What if I have previous checkpoints( in HDFS for example) with offsets,
> and I want to start reading from the last checkpoint?
> In source code of KafkaUtils.createDirectStream I see the following
>
>  val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
>
>      (for {
>        topicPartitions <- kc.getPartitions(topics).right
>        leaderOffsets <- (if (reset == Some("smallest")) {
>          kc.getEarliestLeaderOffsets(topicPartitions)
>        } else {
>          kc.getLatestLeaderOffsets(topicPartitions)
>        }).right
> ...
>
> So it turns out that, I have no options to start reading from
> checkpoints(and offsets)?
> Am I right?
> How can I force Spark to start reading from saved offesets(in
> checkpoints)? Is it possible at all or I need to store offsets in external
> datastore?
>
> Alexey Ponkin
>
> ------------------------------
> View this message in context: [streaming] KafkaUtils.createDirectStream -
> how to start streming from checkpoints?
> <http://apache-spark-user-list.1001560.n3.nabble.com/streaming-KafkaUtils-createDirectStream-how-to-start-streming-from-checkpoints-tp25461.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>

Reply via email to