Well, you are returning JavaStreamingContext.getOrCreate(params.
getCheckpointDir(), factory);
That is loading the checkpointed context, independent of whether params
.isCheckpointed() is true.




On Tue, Sep 8, 2015 at 8:28 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com>
wrote:

> That is good to know. However, that doesn't change the problem I'm seeing.
> Which is that, even with that piece of code commented out
> (stream.checkpoint()), the batch duration millis aren't getting changed
> unless I take checkpointing completely out.
>
> In other words, this commented out:
>
> //    if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
> //
> messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis()));
> //    }
>
> doesn't change the "stickiness" of the batch duration millis value.
> However, if the context is not checkpointed, the problem goes away and the
> batch duration millis setting is properly updated.
>
> I'll take the commented out piece out however I still need to figure out
> how to fix the batch duration millis so I can also keep the checkpointed
> context.
>
> The checkpointed context is basically created as I stated before, note the
> invokation of the checkpoint() method:
>
>   private JavaStreamingContext createCheckpointedContext(SparkConf
> sparkConf, Parameters params) {
>     JavaStreamingContextFactory factory = new
> JavaStreamingContextFactory() {
>       @Override
>       public JavaStreamingContext create() {
>         return createContext(sparkConf, params);
>       }
>     };
>     return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
> factory);
>   }
>
>   private JavaStreamingContext createContext(SparkConf sparkConf,
> Parameters params) {
>     // Create context with the specified batch interval, in milliseconds.
>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.milliseconds(params.getBatchDurationMillis()));
>     // Set the checkpoint directory, if we're checkpointing
>     if (params.isCheckpointed()) {
>       jssc.checkpoint(params.getCheckpointDir());
>     }
>
> .......
>
> On Tue, Sep 8, 2015 at 11:14 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Calling directKafkaStream.checkpoint() will make the system write the raw
>> kafka data into HDFS files (that is, RDD checkpointing). This is completely
>> unnecessary with Direct Kafka because it already tracks the offset of data
>> in each batch
>>
>
>
>> (which checkpoint is enabled using
>> streamingContext.checkpoint(checkpointDir)) and can recover from failure by
>> reading the exact same data back from Kafka.
>>
>>
>> TD
>>
>> On Tue, Sep 8, 2015 at 4:38 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> >> Why are you checkpointing the direct kafka stream? It serves not
>>> purpose.
>>>
>>> Could you elaborate on what you mean?
>>>
>>> Our goal is fault tolerance.  If a consumer is killed or stopped
>>> midstream, we want to resume where we left off next time the consumer is
>>> restarted.
>>>
>>> How would that be "not surving a purpose"?  This is already working for
>>> us.  From our testing, we indeed resume where we left off, which is not
>>> possible without checkpointing.  If checkpointing is turned off, we'll
>>> resume with the later Kafka topic entries, which would lead to skipping
>>> over some entries.
>>>
>>> Please elaborate.
>>>
>>> We need both checkpointing and the ability to set batch duration
>>> millis.  The Spark API provides both capabilities but somehow if
>>> checkpointing is turned on, our batch duration millis are always set to 10
>>> seconds internally by Spark.  What is the resolution?
>>>
>>>
>>> On Tue, Sep 8, 2015 at 7:23 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Why are you checkpointing the direct kafka stream? It serves not
>>>> purpose.
>>>>
>>>> TD
>>>>
>>>> On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> I just disabled checkpointing in our consumers and I can see that the
>>>>> batch duration millis set to 20 seconds is now being honored.
>>>>>
>>>>> Why would that be the case?
>>>>>
>>>>> And how can we "untie" batch duration millis from checkpointing?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Well, I'm not sure why you're checkpointing messages.
>>>>>>
>>>>>> I'd also put in some logging to see what values are actually being
>>>>>> read out of your params object for the various settings.
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>
>>>>>>> I've stopped the jobs, the workers, and the master. Deleted the
>>>>>>> contents of the checkpointing dir. Then restarted master, workers, and
>>>>>>> consumers.
>>>>>>>
>>>>>>> I'm seeing the job in question still firing every 10 seconds.  I'm
>>>>>>> seeing the 10 seconds in the Spark Jobs GUI page as well as our logs.
>>>>>>> Seems quite strange given that the jobs used to fire every 1 second, 
>>>>>>> we've
>>>>>>> switched to 10, now trying to switch to 20 and batch duration millis is 
>>>>>>> not
>>>>>>> changing.
>>>>>>>
>>>>>>> Does anything stand out in the code perhaps?
>>>>>>>
>>>>>>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger <c...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Have you tried deleting or moving the contents of the checkpoint
>>>>>>>> directory and restarting the job?
>>>>>>>>
>>>>>>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Sorry, more relevant code below:
>>>>>>>>>
>>>>>>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>>>>>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>>>>>> createCheckpointedContext(sparkConf, params) : createContext(
>>>>>>>>> sparkConf, params);
>>>>>>>>> jssc.start();
>>>>>>>>> jssc.awaitTermination();
>>>>>>>>> jssc.close();
>>>>>>>>> ……………………………..
>>>>>>>>>   private JavaStreamingContext
>>>>>>>>> createCheckpointedContext(SparkConf sparkConf, Parameters params)
>>>>>>>>> {
>>>>>>>>>     JavaStreamingContextFactory factory = new
>>>>>>>>> JavaStreamingContextFactory() {
>>>>>>>>>       @Override
>>>>>>>>>       public JavaStreamingContext create() {
>>>>>>>>>         return createContext(sparkConf, params);
>>>>>>>>>       }
>>>>>>>>>     };
>>>>>>>>>     return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>>>>>>> factory);
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>>>>>>> Parameters params) {
>>>>>>>>>     // Create context with the specified batch interval, in
>>>>>>>>> milliseconds.
>>>>>>>>>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>>>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>>>>>>>     // Set the checkpoint directory, if we're checkpointing
>>>>>>>>>     if (params.isCheckpointed()) {
>>>>>>>>>       jssc.checkpoint(params.getCheckpointDir());
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     Set<String> topicsSet = new HashSet<String>(Arrays.asList(
>>>>>>>>> params.getTopic()));
>>>>>>>>>
>>>>>>>>>     // Set the Kafka parameters.
>>>>>>>>>     Map<String, String> kafkaParams = new HashMap<String,
>>>>>>>>> String>();
>>>>>>>>>     kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
>>>>>>>>> params.getBrokerList());
>>>>>>>>>     if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>>>>>>>>       kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET,
>>>>>>>>> params.getAutoOffsetReset());
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     // Create direct Kafka stream with the brokers and the topic.
>>>>>>>>>     JavaPairInputDStream<String, String> messages =
>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>       jssc,
>>>>>>>>>       String.class,
>>>>>>>>>       String.class,
>>>>>>>>>       StringDecoder.class,
>>>>>>>>>       StringDecoder.class,
>>>>>>>>>       kafkaParams,
>>>>>>>>>       topicsSet);
>>>>>>>>>
>>>>>>>>>     // See if there's an override of the default checkpoint
>>>>>>>>> duration.
>>>>>>>>>     if (params.isCheckpointed() && params.getCheckpointMillis() >
>>>>>>>>> 0L) {
>>>>>>>>>       messages.checkpoint(Durations.milliseconds(params
>>>>>>>>> .getCheckpointMillis()));
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     JavaDStream<String> messageBodies = messages.map(new
>>>>>>>>> Function<Tuple2<String, String>, String>() {
>>>>>>>>>       @Override
>>>>>>>>>       public String call(Tuple2<String, String> tuple2) {
>>>>>>>>>         return tuple2._2();
>>>>>>>>>       }
>>>>>>>>>     });
>>>>>>>>>
>>>>>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>,
>>>>>>>>> Void>() {
>>>>>>>>>       @Override
>>>>>>>>>       public Void call(JavaRDD<String> rdd) throws Exception {
>>>>>>>>>         ProcessPartitionFunction func = new
>>>>>>>>> ProcessPartitionFunction(params);
>>>>>>>>>         rdd.foreachPartition(func);
>>>>>>>>>         return null;
>>>>>>>>>       }
>>>>>>>>>     });
>>>>>>>>>
>>>>>>>>>     return jssc;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I'd think that we wouldn't be "accidentally recovering from
>>>>>>>>>> checkpoint" hours or even days after consumers have been restarted, 
>>>>>>>>>> plus
>>>>>>>>>> the content is the fresh content that I'm feeding, not some content 
>>>>>>>>>> that
>>>>>>>>>> had been fed before the last restart.
>>>>>>>>>>
>>>>>>>>>> The code is basically as follows:
>>>>>>>>>>
>>>>>>>>>>     SparkConf sparkConf = createSparkConf(...);
>>>>>>>>>>     // We'd be 'checkpointed' because we specify a checkpoint
>>>>>>>>>> directory which makes isCheckpointed true
>>>>>>>>>>     JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>>>>>>> createCheckpointedContext(sparkConf, params) : createContext(
>>>>>>>>>> sparkConf, params);    jssc.start();
>>>>>>>>>>
>>>>>>>>>>     jssc.awaitTermination();
>>>>>>>>>>
>>>>>>>>>>     jssc.close();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <
>>>>>>>>>> t...@databricks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you sure you are not accidentally recovering from
>>>>>>>>>>> checkpoint? How are you using StreamingContext.getOrCreate() in 
>>>>>>>>>>> your code?
>>>>>>>>>>>
>>>>>>>>>>> TD
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
>>>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Tathagata,
>>>>>>>>>>>>
>>>>>>>>>>>> In our logs I see the batch duration millis being set first to
>>>>>>>>>>>> 10 then to 20 seconds. I don't see the 20 being reflected later 
>>>>>>>>>>>> during
>>>>>>>>>>>> ingestion.
>>>>>>>>>>>>
>>>>>>>>>>>> In the Spark UI under Streaming I see the below output, notice
>>>>>>>>>>>> the *10 second* Batch interval.  Can you think of a reason why
>>>>>>>>>>>> it's stuck at 10?  It used to be 1 second by the way, then somehow 
>>>>>>>>>>>> over the
>>>>>>>>>>>> course of a few restarts we managed to get it to be 10 seconds.  
>>>>>>>>>>>> Now it
>>>>>>>>>>>> won't get reset to 20 seconds.  Any ideas?
>>>>>>>>>>>>
>>>>>>>>>>>> Streaming
>>>>>>>>>>>>
>>>>>>>>>>>>    - *Started at: *Thu Sep 03 10:59:03 EDT 2015
>>>>>>>>>>>>    - *Time since start: *1 day 8 hours 44 minutes
>>>>>>>>>>>>    - *Network receivers: *0
>>>>>>>>>>>>    - *Batch interval: *10 seconds
>>>>>>>>>>>>    - *Processed batches: *11790
>>>>>>>>>>>>    - *Waiting batches: *0
>>>>>>>>>>>>    - *Received records: *0
>>>>>>>>>>>>    - *Processed records: *0
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Statistics over last 100 processed batchesReceiver Statistics
>>>>>>>>>>>> No receivers
>>>>>>>>>>>> Batch Processing Statistics
>>>>>>>>>>>>
>>>>>>>>>>>>    MetricLast batchMinimum25th percentileMedian75th percentile
>>>>>>>>>>>>    MaximumProcessing Time23 ms7 ms10 ms11 ms14 ms172 msScheduling
>>>>>>>>>>>>    Delay1 ms0 ms0 ms0 ms1 ms2 msTotal Delay24 ms8 ms10 ms12 ms14
>>>>>>>>>>>>    ms173 ms
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <
>>>>>>>>>>>> t...@databricks.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Could you see what the streaming tab in the Spark UI says? It
>>>>>>>>>>>>> should show the underlying batch duration of the 
>>>>>>>>>>>>> StreamingContext, the
>>>>>>>>>>>>> details of when the batch starts, etc.
>>>>>>>>>>>>>
>>>>>>>>>>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present
>>>>>>>>>>>>> only when data is present (that is, * Documents processed: >
>>>>>>>>>>>>> 0)*
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <
>>>>>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Tathagata,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Checkpointing is turned on but we were not recovering. I'm
>>>>>>>>>>>>>> looking at the logs now, feeding fresh content hours after the 
>>>>>>>>>>>>>> restart.
>>>>>>>>>>>>>> Here's a snippet:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0.
>>>>>>>>>>>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.*
>>>>>>>>>>>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0.
>>>>>>>>>>>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0.
>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0.
>>>>>>>>>>>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Interestingly, the fresh content (4 documents) is fed about
>>>>>>>>>>>>>> 5.6 seconds after the previous batch, not 10 seconds. The second 
>>>>>>>>>>>>>> fresh
>>>>>>>>>>>>>> batch comes in 6.8 seconds after the previous empty one.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Granted, the log message is printed after iterating over the
>>>>>>>>>>>>>> messages which may account for some time differences. But 
>>>>>>>>>>>>>> generally,
>>>>>>>>>>>>>> looking at the log messages being printed before we iterate, 
>>>>>>>>>>>>>> it's still 10
>>>>>>>>>>>>>> seconds each time, not 20 which is what batchdurationmillis is 
>>>>>>>>>>>>>> currently
>>>>>>>>>>>>>> set to.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>>>>>>> KafkaUtils.createDirectStream(....);
>>>>>>>>>>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   JavaDStream<String> messageBodies = messages.map(new 
>>>>>>>>>>>>>> Function<Tuple2<String,
>>>>>>>>>>>>>> String>, String>() {
>>>>>>>>>>>>>>       @Override
>>>>>>>>>>>>>>       public String call(Tuple2<String, String> tuple2) {
>>>>>>>>>>>>>>         return tuple2._2();
>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>     });
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>,
>>>>>>>>>>>>>> Void>() {
>>>>>>>>>>>>>>       @Override
>>>>>>>>>>>>>>       public Void call(JavaRDD<String> rdd) throws Exception {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   ProcessPartitionFunction func = new 
>>>>>>>>>>>>>> ProcessPartitionFunction(...);
>>>>>>>>>>>>>>         rdd.foreachPartition(func);
>>>>>>>>>>>>>>         return null;
>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>     });
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The log message comes from ProcessPartitionFunction:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public void call(Iterator<String> messageIterator) throws 
>>>>>>>>>>>>>> Exception
>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>     log.info("Starting data partition processing.
>>>>>>>>>>>>>> AppName={}, topic={}.)...", appName, topic);
>>>>>>>>>>>>>>     // ... iterate ...
>>>>>>>>>>>>>>     log.info("Finished data partition processing
>>>>>>>>>>>>>> (appName={}, topic={}). Documents processed: {}.", appName,
>>>>>>>>>>>>>> topic, docCount);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any ideas? Thanks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - Dmitry
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <
>>>>>>>>>>>>>> t...@databricks.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Are you accidentally recovering from checkpoint files which
>>>>>>>>>>>>>>> has 10 second as the batch interval?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <
>>>>>>>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm seeing an oddity where I initially set the
>>>>>>>>>>>>>>>> batchdurationmillis to 1 second and it works fine:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(
>>>>>>>>>>>>>>>> sparkConf, Durations.milliseconds(batchDurationMillis));
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Then I tried changing the value to 10 seconds. The change
>>>>>>>>>>>>>>>> didn't seem to take. I've bounced the Spark workers and the 
>>>>>>>>>>>>>>>> consumers and
>>>>>>>>>>>>>>>> now I'm seeing RDD's coming in once around 10 seconds (not 
>>>>>>>>>>>>>>>> always 10
>>>>>>>>>>>>>>>> seconds according to the logs).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> However, now I'm trying to change the value to 20 seconds
>>>>>>>>>>>>>>>> and it's just not taking. I've bounced Spark master, workers, 
>>>>>>>>>>>>>>>> and consumers
>>>>>>>>>>>>>>>> and the value seems "stuck" at 10 seconds.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> - Dmitry
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to