Have you looked at the driver and executor logs? Without being able to see what's in the "do stuff with the dstream" section of code... I'd suggest starting with a simpler job, e.g that does nothing but print each message, and verify whether it checkpoints
On Fri, Nov 6, 2015 at 3:59 AM, Kathi Stutz <em...@kathistutz.de> wrote: > Hi all, > > I want to load an InputDStream from a checkkpoint, but I doesn't work, and > after trying several things I have finally run out of ideas. > > So, here's what I do: > > 1. I create the streaming context - or load it from the checkpoint > directory. > > def main(args: Array[String]) { > val ssc = StreamingContext.getOrCreate("files/checkpoint", > createStreamingContext _) > ssc.start() > ssc.awaitTermination() > } > > 2. In the function createStreamingContext(), I first create a new Spark > config... > > def createStreamingContext(): StreamingContext = { > println("New Context") > > val conf = new SparkConf() > .setMaster("local[2]") > .setAppName("CheckpointTest") > .set("spark.streaming.kafka.maxRatePerPartition", "10000") > > //...then I create the streaming context... > val ssc = new StreamingContext(conf, Seconds(1)) > > var offsetRanges = Array[OffsetRange]() > val kafkaParams = Map("metadata.broker.list" -> > "sandbox.hortonworks.com:6667", > "auto.offset.reset" -> "smallest") //Start from beginning > val kafkaTopics = Set("Bla") > > //...then I go and get a DStream from Kafka... > val directKafkaStream = KafkaUtils.createDirectStream[String, > Array[Byte], StringDecoder, DefaultDecoder](ssc, > kafkaParams, kafkaTopics) > > //...I do stuff with the DStream > ... > > //...and finally I checkpoint the streaming context and return it > ssc.checkpoint("files/checkpoint") > ssc > } > > 3. When I start the application, after a while it creates in > files/checkpoint/ an empty directory with a name like > 23207ed2-c021-4a1d-8af8-0620a19a8665. But that's all, no more files or > directories or whatever appear there. > > 4. When I stop the application and restart it, it creates a new streaming > context each time. (This also means it starts the Kafka streaming from the > smallest available offset again and again. The main reason for using > checkpoints for me was to not having to keep track of Kafka offsets.) > > So, what am I doing wrong? > > Thanks a lot! > > Kathi > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >