Why are you checkpointing the direct kafka stream? It serves not purpose. TD
On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com> wrote: > I just disabled checkpointing in our consumers and I can see that the > batch duration millis set to 20 seconds is now being honored. > > Why would that be the case? > > And how can we "untie" batch duration millis from checkpointing? > > Thanks. > > On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Well, I'm not sure why you're checkpointing messages. >> >> I'd also put in some logging to see what values are actually being read >> out of your params object for the various settings. >> >> >> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> I've stopped the jobs, the workers, and the master. Deleted the contents >>> of the checkpointing dir. Then restarted master, workers, and consumers. >>> >>> I'm seeing the job in question still firing every 10 seconds. I'm >>> seeing the 10 seconds in the Spark Jobs GUI page as well as our logs. >>> Seems quite strange given that the jobs used to fire every 1 second, we've >>> switched to 10, now trying to switch to 20 and batch duration millis is not >>> changing. >>> >>> Does anything stand out in the code perhaps? >>> >>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Have you tried deleting or moving the contents of the checkpoint >>>> directory and restarting the job? >>>> >>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg < >>>> dgoldenberg...@gmail.com> wrote: >>>> >>>>> Sorry, more relevant code below: >>>>> >>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv); >>>>> JavaStreamingContext jssc = params.isCheckpointed() ? >>>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf, >>>>> params); >>>>> jssc.start(); >>>>> jssc.awaitTermination(); >>>>> jssc.close(); >>>>> …………………………….. >>>>> private JavaStreamingContext createCheckpointedContext(SparkConf >>>>> sparkConf, Parameters params) { >>>>> JavaStreamingContextFactory factory = new >>>>> JavaStreamingContextFactory() { >>>>> @Override >>>>> public JavaStreamingContext create() { >>>>> return createContext(sparkConf, params); >>>>> } >>>>> }; >>>>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), >>>>> factory); >>>>> } >>>>> >>>>> private JavaStreamingContext createContext(SparkConf sparkConf, >>>>> Parameters params) { >>>>> // Create context with the specified batch interval, in >>>>> milliseconds. >>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>>> Durations.milliseconds(params.getBatchDurationMillis())); >>>>> // Set the checkpoint directory, if we're checkpointing >>>>> if (params.isCheckpointed()) { >>>>> jssc.checkpoint(params.getCheckpointDir()); >>>>> } >>>>> >>>>> Set<String> topicsSet = new HashSet<String>(Arrays.asList(params >>>>> .getTopic())); >>>>> >>>>> // Set the Kafka parameters. >>>>> Map<String, String> kafkaParams = new HashMap<String, String>(); >>>>> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, >>>>> params.getBrokerList()); >>>>> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) { >>>>> kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, >>>>> params.getAutoOffsetReset()); >>>>> } >>>>> >>>>> // Create direct Kafka stream with the brokers and the topic. >>>>> JavaPairInputDStream<String, String> messages = >>>>> KafkaUtils.createDirectStream( >>>>> jssc, >>>>> String.class, >>>>> String.class, >>>>> StringDecoder.class, >>>>> StringDecoder.class, >>>>> kafkaParams, >>>>> topicsSet); >>>>> >>>>> // See if there's an override of the default checkpoint duration. >>>>> if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) >>>>> { >>>>> messages.checkpoint(Durations.milliseconds(params >>>>> .getCheckpointMillis())); >>>>> } >>>>> >>>>> JavaDStream<String> messageBodies = messages.map(new >>>>> Function<Tuple2<String, String>, String>() { >>>>> @Override >>>>> public String call(Tuple2<String, String> tuple2) { >>>>> return tuple2._2(); >>>>> } >>>>> }); >>>>> >>>>> messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() { >>>>> @Override >>>>> public Void call(JavaRDD<String> rdd) throws Exception { >>>>> ProcessPartitionFunction func = new >>>>> ProcessPartitionFunction(params); >>>>> rdd.foreachPartition(func); >>>>> return null; >>>>> } >>>>> }); >>>>> >>>>> return jssc; >>>>> } >>>>> >>>>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg < >>>>> dgoldenberg...@gmail.com> wrote: >>>>> >>>>>> I'd think that we wouldn't be "accidentally recovering from >>>>>> checkpoint" hours or even days after consumers have been restarted, plus >>>>>> the content is the fresh content that I'm feeding, not some content that >>>>>> had been fed before the last restart. >>>>>> >>>>>> The code is basically as follows: >>>>>> >>>>>> SparkConf sparkConf = createSparkConf(...); >>>>>> // We'd be 'checkpointed' because we specify a checkpoint >>>>>> directory which makes isCheckpointed true >>>>>> JavaStreamingContext jssc = params.isCheckpointed() ? >>>>>> createCheckpointedContext(sparkConf, params) : createContext( >>>>>> sparkConf, params); jssc.start(); >>>>>> >>>>>> jssc.awaitTermination(); >>>>>> >>>>>> jssc.close(); >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Are you sure you are not accidentally recovering from checkpoint? >>>>>>> How are you using StreamingContext.getOrCreate() in your code? >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg < >>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>> >>>>>>>> Tathagata, >>>>>>>> >>>>>>>> In our logs I see the batch duration millis being set first to 10 >>>>>>>> then to 20 seconds. I don't see the 20 being reflected later during >>>>>>>> ingestion. >>>>>>>> >>>>>>>> In the Spark UI under Streaming I see the below output, notice the *10 >>>>>>>> second* Batch interval. Can you think of a reason why it's stuck >>>>>>>> at 10? It used to be 1 second by the way, then somehow over the >>>>>>>> course of >>>>>>>> a few restarts we managed to get it to be 10 seconds. Now it won't get >>>>>>>> reset to 20 seconds. Any ideas? >>>>>>>> >>>>>>>> Streaming >>>>>>>> >>>>>>>> - *Started at: *Thu Sep 03 10:59:03 EDT 2015 >>>>>>>> - *Time since start: *1 day 8 hours 44 minutes >>>>>>>> - *Network receivers: *0 >>>>>>>> - *Batch interval: *10 seconds >>>>>>>> - *Processed batches: *11790 >>>>>>>> - *Waiting batches: *0 >>>>>>>> - *Received records: *0 >>>>>>>> - *Processed records: *0 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Statistics over last 100 processed batchesReceiver Statistics >>>>>>>> No receivers >>>>>>>> Batch Processing Statistics >>>>>>>> >>>>>>>> MetricLast batchMinimum25th percentileMedian75th percentile >>>>>>>> MaximumProcessing Time23 ms7 ms10 ms11 ms14 ms172 msScheduling >>>>>>>> Delay1 ms0 ms0 ms0 ms1 ms2 msTotal Delay24 ms8 ms10 ms12 ms14 ms173 >>>>>>>> ms >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <t...@databricks.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Could you see what the streaming tab in the Spark UI says? It >>>>>>>>> should show the underlying batch duration of the StreamingContext, the >>>>>>>>> details of when the batch starts, etc. >>>>>>>>> >>>>>>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only >>>>>>>>> when data is present (that is, * Documents processed: > 0)* >>>>>>>>> >>>>>>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg < >>>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Tathagata, >>>>>>>>>> >>>>>>>>>> Checkpointing is turned on but we were not recovering. I'm >>>>>>>>>> looking at the logs now, feeding fresh content hours after the >>>>>>>>>> restart. >>>>>>>>>> Here's a snippet: >>>>>>>>>> >>>>>>>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0. >>>>>>>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.* >>>>>>>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0. >>>>>>>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0. >>>>>>>>>> ... >>>>>>>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0. >>>>>>>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.* >>>>>>>>>> >>>>>>>>>> Interestingly, the fresh content (4 documents) is fed about 5.6 >>>>>>>>>> seconds after the previous batch, not 10 seconds. The second fresh >>>>>>>>>> batch >>>>>>>>>> comes in 6.8 seconds after the previous empty one. >>>>>>>>>> >>>>>>>>>> Granted, the log message is printed after iterating over the >>>>>>>>>> messages which may account for some time differences. But generally, >>>>>>>>>> looking at the log messages being printed before we iterate, it's >>>>>>>>>> still 10 >>>>>>>>>> seconds each time, not 20 which is what batchdurationmillis is >>>>>>>>>> currently >>>>>>>>>> set to. >>>>>>>>>> >>>>>>>>>> Code: >>>>>>>>>> >>>>>>>>>> JavaPairInputDStream<String, String> messages = >>>>>>>>>> KafkaUtils.createDirectStream(....); >>>>>>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis)); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> JavaDStream<String> messageBodies = messages.map(new >>>>>>>>>> Function<Tuple2<String, >>>>>>>>>> String>, String>() { >>>>>>>>>> @Override >>>>>>>>>> public String call(Tuple2<String, String> tuple2) { >>>>>>>>>> return tuple2._2(); >>>>>>>>>> } >>>>>>>>>> }); >>>>>>>>>> >>>>>>>>>> messageBodies.foreachRDD(new Function<JavaRDD<String>, >>>>>>>>>> Void>() { >>>>>>>>>> @Override >>>>>>>>>> public Void call(JavaRDD<String> rdd) throws Exception { >>>>>>>>>> >>>>>>>>>> ProcessPartitionFunction func = new ProcessPartitionFunction(...); >>>>>>>>>> rdd.foreachPartition(func); >>>>>>>>>> return null; >>>>>>>>>> } >>>>>>>>>> }); >>>>>>>>>> >>>>>>>>>> The log message comes from ProcessPartitionFunction: >>>>>>>>>> >>>>>>>>>> public void call(Iterator<String> messageIterator) throws Exception >>>>>>>>>> { >>>>>>>>>> log.info("Starting data partition processing. AppName={}, >>>>>>>>>> topic={}.)...", appName, topic); >>>>>>>>>> // ... iterate ... >>>>>>>>>> log.info("Finished data partition processing (appName={}, >>>>>>>>>> topic={}). Documents processed: {}.", appName, topic, docCount); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Any ideas? Thanks. >>>>>>>>>> >>>>>>>>>> - Dmitry >>>>>>>>>> >>>>>>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das < >>>>>>>>>> t...@databricks.com> wrote: >>>>>>>>>> >>>>>>>>>>> Are you accidentally recovering from checkpoint files which has >>>>>>>>>>> 10 second as the batch interval? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg < >>>>>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm seeing an oddity where I initially set the >>>>>>>>>>>> batchdurationmillis to 1 second and it works fine: >>>>>>>>>>>> >>>>>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>>>>>>>>>> Durations.milliseconds(batchDurationMillis)); >>>>>>>>>>>> >>>>>>>>>>>> Then I tried changing the value to 10 seconds. The change >>>>>>>>>>>> didn't seem to take. I've bounced the Spark workers and the >>>>>>>>>>>> consumers and >>>>>>>>>>>> now I'm seeing RDD's coming in once around 10 seconds (not always >>>>>>>>>>>> 10 >>>>>>>>>>>> seconds according to the logs). >>>>>>>>>>>> >>>>>>>>>>>> However, now I'm trying to change the value to 20 seconds and >>>>>>>>>>>> it's just not taking. I've bounced Spark master, workers, and >>>>>>>>>>>> consumers and >>>>>>>>>>>> the value seems "stuck" at 10 seconds. >>>>>>>>>>>> >>>>>>>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4. >>>>>>>>>>>> >>>>>>>>>>>> Thanks. >>>>>>>>>>>> >>>>>>>>>>>> - Dmitry >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >