Have you tried deleting or moving the contents of the checkpoint directory and restarting the job?
On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com> wrote: > Sorry, more relevant code below: > > SparkConf sparkConf = createSparkConf(appName, kahunaEnv); > JavaStreamingContext jssc = params.isCheckpointed() ? > createCheckpointedContext(sparkConf, params) : createContext(sparkConf, > params); > jssc.start(); > jssc.awaitTermination(); > jssc.close(); > …………………………….. > private JavaStreamingContext createCheckpointedContext(SparkConf > sparkConf, Parameters params) { > JavaStreamingContextFactory factory = new > JavaStreamingContextFactory() { > @Override > public JavaStreamingContext create() { > return createContext(sparkConf, params); > } > }; > return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), > factory); > } > > private JavaStreamingContext createContext(SparkConf sparkConf, > Parameters params) { > // Create context with the specified batch interval, in milliseconds. > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, > Durations.milliseconds(params.getBatchDurationMillis())); > // Set the checkpoint directory, if we're checkpointing > if (params.isCheckpointed()) { > jssc.checkpoint(params.getCheckpointDir()); > } > > Set<String> topicsSet = new HashSet<String>(Arrays.asList(params > .getTopic())); > > // Set the Kafka parameters. > Map<String, String> kafkaParams = new HashMap<String, String>(); > kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params > .getBrokerList()); > if (StringUtils.isNotBlank(params.getAutoOffsetReset())) { > kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params > .getAutoOffsetReset()); > } > > // Create direct Kafka stream with the brokers and the topic. > JavaPairInputDStream<String, String> messages = > KafkaUtils.createDirectStream( > jssc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicsSet); > > // See if there's an override of the default checkpoint duration. > if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) { > messages.checkpoint(Durations.milliseconds(params > .getCheckpointMillis())); > } > > JavaDStream<String> messageBodies = messages.map(new > Function<Tuple2<String, String>, String>() { > @Override > public String call(Tuple2<String, String> tuple2) { > return tuple2._2(); > } > }); > > messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() { > @Override > public Void call(JavaRDD<String> rdd) throws Exception { > ProcessPartitionFunction func = new > ProcessPartitionFunction(params); > rdd.foreachPartition(func); > return null; > } > }); > > return jssc; > } > > On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> I'd think that we wouldn't be "accidentally recovering from checkpoint" >> hours or even days after consumers have been restarted, plus the content is >> the fresh content that I'm feeding, not some content that had been fed >> before the last restart. >> >> The code is basically as follows: >> >> SparkConf sparkConf = createSparkConf(...); >> // We'd be 'checkpointed' because we specify a checkpoint directory >> which makes isCheckpointed true >> JavaStreamingContext jssc = params.isCheckpointed() ? >> createCheckpointedContext(sparkConf, params) : createContext(sparkConf, >> params); jssc.start(); >> >> jssc.awaitTermination(); >> >> jssc.close(); >> >> >> >> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Are you sure you are not accidentally recovering from checkpoint? How >>> are you using StreamingContext.getOrCreate() in your code? >>> >>> TD >>> >>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> >>>> Tathagata, >>>> >>>> In our logs I see the batch duration millis being set first to 10 then >>>> to 20 seconds. I don't see the 20 being reflected later during ingestion. >>>> >>>> In the Spark UI under Streaming I see the below output, notice the *10 >>>> second* Batch interval. Can you think of a reason why it's stuck at >>>> 10? It used to be 1 second by the way, then somehow over the course of a >>>> few restarts we managed to get it to be 10 seconds. Now it won't get reset >>>> to 20 seconds. Any ideas? >>>> >>>> Streaming >>>> >>>> - *Started at: *Thu Sep 03 10:59:03 EDT 2015 >>>> - *Time since start: *1 day 8 hours 44 minutes >>>> - *Network receivers: *0 >>>> - *Batch interval: *10 seconds >>>> - *Processed batches: *11790 >>>> - *Waiting batches: *0 >>>> - *Received records: *0 >>>> - *Processed records: *0 >>>> >>>> >>>> >>>> Statistics over last 100 processed batchesReceiver Statistics >>>> No receivers >>>> Batch Processing Statistics >>>> >>>> MetricLast batchMinimum25th percentileMedian75th >>>> percentileMaximumProcessing >>>> Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1 >>>> ms2 msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> Could you see what the streaming tab in the Spark UI says? It should >>>>> show the underlying batch duration of the StreamingContext, the details of >>>>> when the batch starts, etc. >>>>> >>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when >>>>> data is present (that is, * Documents processed: > 0)* >>>>> >>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg < >>>>> dgoldenberg...@gmail.com> wrote: >>>>> >>>>>> Tathagata, >>>>>> >>>>>> Checkpointing is turned on but we were not recovering. I'm looking at >>>>>> the logs now, feeding fresh content hours after the restart. Here's a >>>>>> snippet: >>>>>> >>>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0. >>>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0. >>>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0. >>>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0. >>>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0. >>>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0. >>>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0. >>>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0. >>>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0. >>>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.* >>>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0. >>>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0. >>>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0. >>>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0. >>>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0. >>>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0. >>>>>> ... >>>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0. >>>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.* >>>>>> >>>>>> Interestingly, the fresh content (4 documents) is fed about 5.6 >>>>>> seconds after the previous batch, not 10 seconds. The second fresh batch >>>>>> comes in 6.8 seconds after the previous empty one. >>>>>> >>>>>> Granted, the log message is printed after iterating over the messages >>>>>> which may account for some time differences. But generally, looking at >>>>>> the >>>>>> log messages being printed before we iterate, it's still 10 seconds each >>>>>> time, not 20 which is what batchdurationmillis is currently set to. >>>>>> >>>>>> Code: >>>>>> >>>>>> JavaPairInputDStream<String, String> messages = >>>>>> KafkaUtils.createDirectStream(....); >>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis)); >>>>>> >>>>>> >>>>>> JavaDStream<String> messageBodies = messages.map(new >>>>>> Function<Tuple2<String, >>>>>> String>, String>() { >>>>>> @Override >>>>>> public String call(Tuple2<String, String> tuple2) { >>>>>> return tuple2._2(); >>>>>> } >>>>>> }); >>>>>> >>>>>> messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() { >>>>>> @Override >>>>>> public Void call(JavaRDD<String> rdd) throws Exception { >>>>>> >>>>>> ProcessPartitionFunction func = new ProcessPartitionFunction(...); >>>>>> rdd.foreachPartition(func); >>>>>> return null; >>>>>> } >>>>>> }); >>>>>> >>>>>> The log message comes from ProcessPartitionFunction: >>>>>> >>>>>> public void call(Iterator<String> messageIterator) throws Exception { >>>>>> log.info("Starting data partition processing. AppName={}, >>>>>> topic={}.)...", appName, topic); >>>>>> // ... iterate ... >>>>>> log.info("Finished data partition processing (appName={}, >>>>>> topic={}). Documents processed: {}.", appName, topic, docCount); >>>>>> } >>>>>> >>>>>> Any ideas? Thanks. >>>>>> >>>>>> - Dmitry >>>>>> >>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Are you accidentally recovering from checkpoint files which has 10 >>>>>>> second as the batch interval? >>>>>>> >>>>>>> >>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg < >>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>> >>>>>>>> I'm seeing an oddity where I initially set the batchdurationmillis >>>>>>>> to 1 second and it works fine: >>>>>>>> >>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>>>>>> Durations.milliseconds(batchDurationMillis)); >>>>>>>> >>>>>>>> Then I tried changing the value to 10 seconds. The change didn't >>>>>>>> seem to take. I've bounced the Spark workers and the consumers and now >>>>>>>> I'm >>>>>>>> seeing RDD's coming in once around 10 seconds (not always 10 seconds >>>>>>>> according to the logs). >>>>>>>> >>>>>>>> However, now I'm trying to change the value to 20 seconds and it's >>>>>>>> just not taking. I've bounced Spark master, workers, and consumers and >>>>>>>> the >>>>>>>> value seems "stuck" at 10 seconds. >>>>>>>> >>>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4. >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> - Dmitry >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >