Well, you are returning JavaStreamingContext.getOrCreate(params. getCheckpointDir(), factory); That is loading the checkpointed context, independent of whether params .isCheckpointed() is true.
On Tue, Sep 8, 2015 at 8:28 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com> wrote: > That is good to know. However, that doesn't change the problem I'm seeing. > Which is that, even with that piece of code commented out > (stream.checkpoint()), the batch duration millis aren't getting changed > unless I take checkpointing completely out. > > In other words, this commented out: > > // if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) { > // > messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis())); > // } > > doesn't change the "stickiness" of the batch duration millis value. > However, if the context is not checkpointed, the problem goes away and the > batch duration millis setting is properly updated. > > I'll take the commented out piece out however I still need to figure out > how to fix the batch duration millis so I can also keep the checkpointed > context. > > The checkpointed context is basically created as I stated before, note the > invokation of the checkpoint() method: > > private JavaStreamingContext createCheckpointedContext(SparkConf > sparkConf, Parameters params) { > JavaStreamingContextFactory factory = new > JavaStreamingContextFactory() { > @Override > public JavaStreamingContext create() { > return createContext(sparkConf, params); > } > }; > return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), > factory); > } > > private JavaStreamingContext createContext(SparkConf sparkConf, > Parameters params) { > // Create context with the specified batch interval, in milliseconds. > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, > Durations.milliseconds(params.getBatchDurationMillis())); > // Set the checkpoint directory, if we're checkpointing > if (params.isCheckpointed()) { > jssc.checkpoint(params.getCheckpointDir()); > } > > ....... > > On Tue, Sep 8, 2015 at 11:14 PM, Tathagata Das <t...@databricks.com> > wrote: > >> Calling directKafkaStream.checkpoint() will make the system write the raw >> kafka data into HDFS files (that is, RDD checkpointing). This is completely >> unnecessary with Direct Kafka because it already tracks the offset of data >> in each batch >> > > >> (which checkpoint is enabled using >> streamingContext.checkpoint(checkpointDir)) and can recover from failure by >> reading the exact same data back from Kafka. >> >> >> TD >> >> On Tue, Sep 8, 2015 at 4:38 PM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> >> Why are you checkpointing the direct kafka stream? It serves not >>> purpose. >>> >>> Could you elaborate on what you mean? >>> >>> Our goal is fault tolerance. If a consumer is killed or stopped >>> midstream, we want to resume where we left off next time the consumer is >>> restarted. >>> >>> How would that be "not surving a purpose"? This is already working for >>> us. From our testing, we indeed resume where we left off, which is not >>> possible without checkpointing. If checkpointing is turned off, we'll >>> resume with the later Kafka topic entries, which would lead to skipping >>> over some entries. >>> >>> Please elaborate. >>> >>> We need both checkpointing and the ability to set batch duration >>> millis. The Spark API provides both capabilities but somehow if >>> checkpointing is turned on, our batch duration millis are always set to 10 >>> seconds internally by Spark. What is the resolution? >>> >>> >>> On Tue, Sep 8, 2015 at 7:23 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Why are you checkpointing the direct kafka stream? It serves not >>>> purpose. >>>> >>>> TD >>>> >>>> On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg < >>>> dgoldenberg...@gmail.com> wrote: >>>> >>>>> I just disabled checkpointing in our consumers and I can see that the >>>>> batch duration millis set to 20 seconds is now being honored. >>>>> >>>>> Why would that be the case? >>>>> >>>>> And how can we "untie" batch duration millis from checkpointing? >>>>> >>>>> Thanks. >>>>> >>>>> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Well, I'm not sure why you're checkpointing messages. >>>>>> >>>>>> I'd also put in some logging to see what values are actually being >>>>>> read out of your params object for the various settings. >>>>>> >>>>>> >>>>>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg < >>>>>> dgoldenberg...@gmail.com> wrote: >>>>>> >>>>>>> I've stopped the jobs, the workers, and the master. Deleted the >>>>>>> contents of the checkpointing dir. Then restarted master, workers, and >>>>>>> consumers. >>>>>>> >>>>>>> I'm seeing the job in question still firing every 10 seconds. I'm >>>>>>> seeing the 10 seconds in the Spark Jobs GUI page as well as our logs. >>>>>>> Seems quite strange given that the jobs used to fire every 1 second, >>>>>>> we've >>>>>>> switched to 10, now trying to switch to 20 and batch duration millis is >>>>>>> not >>>>>>> changing. >>>>>>> >>>>>>> Does anything stand out in the code perhaps? >>>>>>> >>>>>>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Have you tried deleting or moving the contents of the checkpoint >>>>>>>> directory and restarting the job? >>>>>>>> >>>>>>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg < >>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Sorry, more relevant code below: >>>>>>>>> >>>>>>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv); >>>>>>>>> JavaStreamingContext jssc = params.isCheckpointed() ? >>>>>>>>> createCheckpointedContext(sparkConf, params) : createContext( >>>>>>>>> sparkConf, params); >>>>>>>>> jssc.start(); >>>>>>>>> jssc.awaitTermination(); >>>>>>>>> jssc.close(); >>>>>>>>> …………………………….. >>>>>>>>> private JavaStreamingContext >>>>>>>>> createCheckpointedContext(SparkConf sparkConf, Parameters params) >>>>>>>>> { >>>>>>>>> JavaStreamingContextFactory factory = new >>>>>>>>> JavaStreamingContextFactory() { >>>>>>>>> @Override >>>>>>>>> public JavaStreamingContext create() { >>>>>>>>> return createContext(sparkConf, params); >>>>>>>>> } >>>>>>>>> }; >>>>>>>>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), >>>>>>>>> factory); >>>>>>>>> } >>>>>>>>> >>>>>>>>> private JavaStreamingContext createContext(SparkConf sparkConf, >>>>>>>>> Parameters params) { >>>>>>>>> // Create context with the specified batch interval, in >>>>>>>>> milliseconds. >>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>>>>>>> Durations.milliseconds(params.getBatchDurationMillis())); >>>>>>>>> // Set the checkpoint directory, if we're checkpointing >>>>>>>>> if (params.isCheckpointed()) { >>>>>>>>> jssc.checkpoint(params.getCheckpointDir()); >>>>>>>>> } >>>>>>>>> >>>>>>>>> Set<String> topicsSet = new HashSet<String>(Arrays.asList( >>>>>>>>> params.getTopic())); >>>>>>>>> >>>>>>>>> // Set the Kafka parameters. >>>>>>>>> Map<String, String> kafkaParams = new HashMap<String, >>>>>>>>> String>(); >>>>>>>>> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, >>>>>>>>> params.getBrokerList()); >>>>>>>>> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) { >>>>>>>>> kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, >>>>>>>>> params.getAutoOffsetReset()); >>>>>>>>> } >>>>>>>>> >>>>>>>>> // Create direct Kafka stream with the brokers and the topic. >>>>>>>>> JavaPairInputDStream<String, String> messages = >>>>>>>>> KafkaUtils.createDirectStream( >>>>>>>>> jssc, >>>>>>>>> String.class, >>>>>>>>> String.class, >>>>>>>>> StringDecoder.class, >>>>>>>>> StringDecoder.class, >>>>>>>>> kafkaParams, >>>>>>>>> topicsSet); >>>>>>>>> >>>>>>>>> // See if there's an override of the default checkpoint >>>>>>>>> duration. >>>>>>>>> if (params.isCheckpointed() && params.getCheckpointMillis() > >>>>>>>>> 0L) { >>>>>>>>> messages.checkpoint(Durations.milliseconds(params >>>>>>>>> .getCheckpointMillis())); >>>>>>>>> } >>>>>>>>> >>>>>>>>> JavaDStream<String> messageBodies = messages.map(new >>>>>>>>> Function<Tuple2<String, String>, String>() { >>>>>>>>> @Override >>>>>>>>> public String call(Tuple2<String, String> tuple2) { >>>>>>>>> return tuple2._2(); >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> >>>>>>>>> messageBodies.foreachRDD(new Function<JavaRDD<String>, >>>>>>>>> Void>() { >>>>>>>>> @Override >>>>>>>>> public Void call(JavaRDD<String> rdd) throws Exception { >>>>>>>>> ProcessPartitionFunction func = new >>>>>>>>> ProcessPartitionFunction(params); >>>>>>>>> rdd.foreachPartition(func); >>>>>>>>> return null; >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> >>>>>>>>> return jssc; >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg < >>>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I'd think that we wouldn't be "accidentally recovering from >>>>>>>>>> checkpoint" hours or even days after consumers have been restarted, >>>>>>>>>> plus >>>>>>>>>> the content is the fresh content that I'm feeding, not some content >>>>>>>>>> that >>>>>>>>>> had been fed before the last restart. >>>>>>>>>> >>>>>>>>>> The code is basically as follows: >>>>>>>>>> >>>>>>>>>> SparkConf sparkConf = createSparkConf(...); >>>>>>>>>> // We'd be 'checkpointed' because we specify a checkpoint >>>>>>>>>> directory which makes isCheckpointed true >>>>>>>>>> JavaStreamingContext jssc = params.isCheckpointed() ? >>>>>>>>>> createCheckpointedContext(sparkConf, params) : createContext( >>>>>>>>>> sparkConf, params); jssc.start(); >>>>>>>>>> >>>>>>>>>> jssc.awaitTermination(); >>>>>>>>>> >>>>>>>>>> jssc.close(); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das < >>>>>>>>>> t...@databricks.com> wrote: >>>>>>>>>> >>>>>>>>>>> Are you sure you are not accidentally recovering from >>>>>>>>>>> checkpoint? How are you using StreamingContext.getOrCreate() in >>>>>>>>>>> your code? >>>>>>>>>>> >>>>>>>>>>> TD >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg < >>>>>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Tathagata, >>>>>>>>>>>> >>>>>>>>>>>> In our logs I see the batch duration millis being set first to >>>>>>>>>>>> 10 then to 20 seconds. I don't see the 20 being reflected later >>>>>>>>>>>> during >>>>>>>>>>>> ingestion. >>>>>>>>>>>> >>>>>>>>>>>> In the Spark UI under Streaming I see the below output, notice >>>>>>>>>>>> the *10 second* Batch interval. Can you think of a reason why >>>>>>>>>>>> it's stuck at 10? It used to be 1 second by the way, then somehow >>>>>>>>>>>> over the >>>>>>>>>>>> course of a few restarts we managed to get it to be 10 seconds. >>>>>>>>>>>> Now it >>>>>>>>>>>> won't get reset to 20 seconds. Any ideas? >>>>>>>>>>>> >>>>>>>>>>>> Streaming >>>>>>>>>>>> >>>>>>>>>>>> - *Started at: *Thu Sep 03 10:59:03 EDT 2015 >>>>>>>>>>>> - *Time since start: *1 day 8 hours 44 minutes >>>>>>>>>>>> - *Network receivers: *0 >>>>>>>>>>>> - *Batch interval: *10 seconds >>>>>>>>>>>> - *Processed batches: *11790 >>>>>>>>>>>> - *Waiting batches: *0 >>>>>>>>>>>> - *Received records: *0 >>>>>>>>>>>> - *Processed records: *0 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Statistics over last 100 processed batchesReceiver Statistics >>>>>>>>>>>> No receivers >>>>>>>>>>>> Batch Processing Statistics >>>>>>>>>>>> >>>>>>>>>>>> MetricLast batchMinimum25th percentileMedian75th percentile >>>>>>>>>>>> MaximumProcessing Time23 ms7 ms10 ms11 ms14 ms172 msScheduling >>>>>>>>>>>> Delay1 ms0 ms0 ms0 ms1 ms2 msTotal Delay24 ms8 ms10 ms12 ms14 >>>>>>>>>>>> ms173 ms >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das < >>>>>>>>>>>> t...@databricks.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Could you see what the streaming tab in the Spark UI says? It >>>>>>>>>>>>> should show the underlying batch duration of the >>>>>>>>>>>>> StreamingContext, the >>>>>>>>>>>>> details of when the batch starts, etc. >>>>>>>>>>>>> >>>>>>>>>>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present >>>>>>>>>>>>> only when data is present (that is, * Documents processed: > >>>>>>>>>>>>> 0)* >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg < >>>>>>>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Tathagata, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Checkpointing is turned on but we were not recovering. I'm >>>>>>>>>>>>>> looking at the logs now, feeding fresh content hours after the >>>>>>>>>>>>>> restart. >>>>>>>>>>>>>> Here's a snippet: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0. >>>>>>>>>>>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.* >>>>>>>>>>>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0. >>>>>>>>>>>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0. >>>>>>>>>>>>>> ... >>>>>>>>>>>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0. >>>>>>>>>>>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.* >>>>>>>>>>>>>> >>>>>>>>>>>>>> Interestingly, the fresh content (4 documents) is fed about >>>>>>>>>>>>>> 5.6 seconds after the previous batch, not 10 seconds. The second >>>>>>>>>>>>>> fresh >>>>>>>>>>>>>> batch comes in 6.8 seconds after the previous empty one. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Granted, the log message is printed after iterating over the >>>>>>>>>>>>>> messages which may account for some time differences. But >>>>>>>>>>>>>> generally, >>>>>>>>>>>>>> looking at the log messages being printed before we iterate, >>>>>>>>>>>>>> it's still 10 >>>>>>>>>>>>>> seconds each time, not 20 which is what batchdurationmillis is >>>>>>>>>>>>>> currently >>>>>>>>>>>>>> set to. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Code: >>>>>>>>>>>>>> >>>>>>>>>>>>>> JavaPairInputDStream<String, String> messages = >>>>>>>>>>>>>> KafkaUtils.createDirectStream(....); >>>>>>>>>>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis)); >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> JavaDStream<String> messageBodies = messages.map(new >>>>>>>>>>>>>> Function<Tuple2<String, >>>>>>>>>>>>>> String>, String>() { >>>>>>>>>>>>>> @Override >>>>>>>>>>>>>> public String call(Tuple2<String, String> tuple2) { >>>>>>>>>>>>>> return tuple2._2(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> }); >>>>>>>>>>>>>> >>>>>>>>>>>>>> messageBodies.foreachRDD(new Function<JavaRDD<String>, >>>>>>>>>>>>>> Void>() { >>>>>>>>>>>>>> @Override >>>>>>>>>>>>>> public Void call(JavaRDD<String> rdd) throws Exception { >>>>>>>>>>>>>> >>>>>>>>>>>>>> ProcessPartitionFunction func = new >>>>>>>>>>>>>> ProcessPartitionFunction(...); >>>>>>>>>>>>>> rdd.foreachPartition(func); >>>>>>>>>>>>>> return null; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> }); >>>>>>>>>>>>>> >>>>>>>>>>>>>> The log message comes from ProcessPartitionFunction: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public void call(Iterator<String> messageIterator) throws >>>>>>>>>>>>>> Exception >>>>>>>>>>>>>> { >>>>>>>>>>>>>> log.info("Starting data partition processing. >>>>>>>>>>>>>> AppName={}, topic={}.)...", appName, topic); >>>>>>>>>>>>>> // ... iterate ... >>>>>>>>>>>>>> log.info("Finished data partition processing >>>>>>>>>>>>>> (appName={}, topic={}). Documents processed: {}.", appName, >>>>>>>>>>>>>> topic, docCount); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any ideas? Thanks. >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Dmitry >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das < >>>>>>>>>>>>>> t...@databricks.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Are you accidentally recovering from checkpoint files which >>>>>>>>>>>>>>> has 10 second as the batch interval? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg < >>>>>>>>>>>>>>> dgoldenberg...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm seeing an oddity where I initially set the >>>>>>>>>>>>>>>> batchdurationmillis to 1 second and it works fine: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext( >>>>>>>>>>>>>>>> sparkConf, Durations.milliseconds(batchDurationMillis)); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Then I tried changing the value to 10 seconds. The change >>>>>>>>>>>>>>>> didn't seem to take. I've bounced the Spark workers and the >>>>>>>>>>>>>>>> consumers and >>>>>>>>>>>>>>>> now I'm seeing RDD's coming in once around 10 seconds (not >>>>>>>>>>>>>>>> always 10 >>>>>>>>>>>>>>>> seconds according to the logs). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> However, now I'm trying to change the value to 20 seconds >>>>>>>>>>>>>>>> and it's just not taking. I've bounced Spark master, workers, >>>>>>>>>>>>>>>> and consumers >>>>>>>>>>>>>>>> and the value seems "stuck" at 10 seconds. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Dmitry >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >