Have you looked at the driver and executor logs?

Without being able to see what's in the "do stuff with the dstream" section
of code... I'd suggest starting with a simpler job, e.g that does nothing
but print each message, and verify whether it checkpoints

On Fri, Nov 6, 2015 at 3:59 AM, Kathi Stutz <em...@kathistutz.de> wrote:

> Hi all,
>
> I want to load an InputDStream from a checkkpoint, but I doesn't work, and
> after trying several things I have finally run out of ideas.
>
> So, here's what I do:
>
> 1. I create the streaming context - or load it from the checkpoint
> directory.
>
>   def main(args: Array[String]) {
>     val ssc = StreamingContext.getOrCreate("files/checkpoint",
> createStreamingContext _)
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
> 2. In the function createStreamingContext(), I first create a new Spark
> config...
>
>   def createStreamingContext(): StreamingContext = {
>     println("New Context")
>
>     val conf = new SparkConf()
>       .setMaster("local[2]")
>       .setAppName("CheckpointTest")
>       .set("spark.streaming.kafka.maxRatePerPartition", "10000")
>
> //...then I create the streaming context...
>     val ssc = new StreamingContext(conf, Seconds(1))
>
>     var offsetRanges = Array[OffsetRange]()
>     val kafkaParams = Map("metadata.broker.list" ->
> "sandbox.hortonworks.com:6667",
>       "auto.offset.reset" -> "smallest") //Start from beginning
>     val kafkaTopics = Set("Bla")
>
> //...then I go and get a DStream from Kafka...
>     val directKafkaStream = KafkaUtils.createDirectStream[String,
> Array[Byte], StringDecoder, DefaultDecoder](ssc,
>         kafkaParams, kafkaTopics)
>
> //...I do stuff with the DStream
>     ...
>
> //...and finally I checkpoint the streaming context and return it
>     ssc.checkpoint("files/checkpoint")
>     ssc
> }
>
> 3. When I start the application, after a while it creates in
> files/checkpoint/ an empty directory with a name like
> 23207ed2-c021-4a1d-8af8-0620a19a8665. But that's all, no more files or
> directories or whatever appear there.
>
> 4. When I stop the application and restart it, it creates a new streaming
> context each time. (This also means it starts the Kafka streaming from the
> smallest available offset again and again. The main reason for using
> checkpoints for me was to not having to keep track of Kafka offsets.)
>
> So, what am I doing wrong?
>
> Thanks a lot!
>
> Kathi
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to