Why are you checkpointing the direct kafka stream? It serves not purpose.

TD

On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com>
wrote:

> I just disabled checkpointing in our consumers and I can see that the
> batch duration millis set to 20 seconds is now being honored.
>
> Why would that be the case?
>
> And how can we "untie" batch duration millis from checkpointing?
>
> Thanks.
>
> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Well, I'm not sure why you're checkpointing messages.
>>
>> I'd also put in some logging to see what values are actually being read
>> out of your params object for the various settings.
>>
>>
>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> I've stopped the jobs, the workers, and the master. Deleted the contents
>>> of the checkpointing dir. Then restarted master, workers, and consumers.
>>>
>>> I'm seeing the job in question still firing every 10 seconds.  I'm
>>> seeing the 10 seconds in the Spark Jobs GUI page as well as our logs.
>>> Seems quite strange given that the jobs used to fire every 1 second, we've
>>> switched to 10, now trying to switch to 20 and batch duration millis is not
>>> changing.
>>>
>>> Does anything stand out in the code perhaps?
>>>
>>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Have you tried deleting or moving the contents of the checkpoint
>>>> directory and restarting the job?
>>>>
>>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> Sorry, more relevant code below:
>>>>>
>>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>>>> params);
>>>>> jssc.start();
>>>>> jssc.awaitTermination();
>>>>> jssc.close();
>>>>> ……………………………..
>>>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>>>> sparkConf, Parameters params) {
>>>>>     JavaStreamingContextFactory factory = new
>>>>> JavaStreamingContextFactory() {
>>>>>       @Override
>>>>>       public JavaStreamingContext create() {
>>>>>         return createContext(sparkConf, params);
>>>>>       }
>>>>>     };
>>>>>     return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>>> factory);
>>>>>   }
>>>>>
>>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>>> Parameters params) {
>>>>>     // Create context with the specified batch interval, in
>>>>> milliseconds.
>>>>>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>>>     // Set the checkpoint directory, if we're checkpointing
>>>>>     if (params.isCheckpointed()) {
>>>>>       jssc.checkpoint(params.getCheckpointDir());
>>>>>     }
>>>>>
>>>>>     Set<String> topicsSet = new HashSet<String>(Arrays.asList(params
>>>>> .getTopic()));
>>>>>
>>>>>     // Set the Kafka parameters.
>>>>>     Map<String, String> kafkaParams = new HashMap<String, String>();
>>>>>     kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
>>>>> params.getBrokerList());
>>>>>     if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>>>>       kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET,
>>>>> params.getAutoOffsetReset());
>>>>>     }
>>>>>
>>>>>     // Create direct Kafka stream with the brokers and the topic.
>>>>>     JavaPairInputDStream<String, String> messages =
>>>>> KafkaUtils.createDirectStream(
>>>>>       jssc,
>>>>>       String.class,
>>>>>       String.class,
>>>>>       StringDecoder.class,
>>>>>       StringDecoder.class,
>>>>>       kafkaParams,
>>>>>       topicsSet);
>>>>>
>>>>>     // See if there's an override of the default checkpoint duration.
>>>>>     if (params.isCheckpointed() && params.getCheckpointMillis() > 0L)
>>>>> {
>>>>>       messages.checkpoint(Durations.milliseconds(params
>>>>> .getCheckpointMillis()));
>>>>>     }
>>>>>
>>>>>     JavaDStream<String> messageBodies = messages.map(new
>>>>> Function<Tuple2<String, String>, String>() {
>>>>>       @Override
>>>>>       public String call(Tuple2<String, String> tuple2) {
>>>>>         return tuple2._2();
>>>>>       }
>>>>>     });
>>>>>
>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>, Void>() {
>>>>>       @Override
>>>>>       public Void call(JavaRDD<String> rdd) throws Exception {
>>>>>         ProcessPartitionFunction func = new
>>>>> ProcessPartitionFunction(params);
>>>>>         rdd.foreachPartition(func);
>>>>>         return null;
>>>>>       }
>>>>>     });
>>>>>
>>>>>     return jssc;
>>>>> }
>>>>>
>>>>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> I'd think that we wouldn't be "accidentally recovering from
>>>>>> checkpoint" hours or even days after consumers have been restarted, plus
>>>>>> the content is the fresh content that I'm feeding, not some content that
>>>>>> had been fed before the last restart.
>>>>>>
>>>>>> The code is basically as follows:
>>>>>>
>>>>>>     SparkConf sparkConf = createSparkConf(...);
>>>>>>     // We'd be 'checkpointed' because we specify a checkpoint
>>>>>> directory which makes isCheckpointed true
>>>>>>     JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>>> createCheckpointedContext(sparkConf, params) : createContext(
>>>>>> sparkConf, params);    jssc.start();
>>>>>>
>>>>>>     jssc.awaitTermination();
>>>>>>
>>>>>>     jssc.close();
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Are you sure you are not accidentally recovering from checkpoint?
>>>>>>> How are you using StreamingContext.getOrCreate() in your code?
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Tathagata,
>>>>>>>>
>>>>>>>> In our logs I see the batch duration millis being set first to 10
>>>>>>>> then to 20 seconds. I don't see the 20 being reflected later during
>>>>>>>> ingestion.
>>>>>>>>
>>>>>>>> In the Spark UI under Streaming I see the below output, notice the *10
>>>>>>>> second* Batch interval.  Can you think of a reason why it's stuck
>>>>>>>> at 10?  It used to be 1 second by the way, then somehow over the 
>>>>>>>> course of
>>>>>>>> a few restarts we managed to get it to be 10 seconds.  Now it won't get
>>>>>>>> reset to 20 seconds.  Any ideas?
>>>>>>>>
>>>>>>>> Streaming
>>>>>>>>
>>>>>>>>    - *Started at: *Thu Sep 03 10:59:03 EDT 2015
>>>>>>>>    - *Time since start: *1 day 8 hours 44 minutes
>>>>>>>>    - *Network receivers: *0
>>>>>>>>    - *Batch interval: *10 seconds
>>>>>>>>    - *Processed batches: *11790
>>>>>>>>    - *Waiting batches: *0
>>>>>>>>    - *Received records: *0
>>>>>>>>    - *Processed records: *0
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Statistics over last 100 processed batchesReceiver Statistics
>>>>>>>> No receivers
>>>>>>>> Batch Processing Statistics
>>>>>>>>
>>>>>>>>    MetricLast batchMinimum25th percentileMedian75th percentile
>>>>>>>>    MaximumProcessing Time23 ms7 ms10 ms11 ms14 ms172 msScheduling
>>>>>>>>    Delay1 ms0 ms0 ms0 ms1 ms2 msTotal Delay24 ms8 ms10 ms12 ms14 ms173
>>>>>>>>    ms
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das <t...@databricks.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Could you see what the streaming tab in the Spark UI says? It
>>>>>>>>> should show the underlying batch duration of the StreamingContext, the
>>>>>>>>> details of when the batch starts, etc.
>>>>>>>>>
>>>>>>>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only
>>>>>>>>> when data is present (that is, * Documents processed: > 0)*
>>>>>>>>>
>>>>>>>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <
>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Tathagata,
>>>>>>>>>>
>>>>>>>>>> Checkpointing is turned on but we were not recovering. I'm
>>>>>>>>>> looking at the logs now, feeding fresh content hours after the 
>>>>>>>>>> restart.
>>>>>>>>>> Here's a snippet:
>>>>>>>>>>
>>>>>>>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0.
>>>>>>>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.*
>>>>>>>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0.
>>>>>>>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0.
>>>>>>>>>> ...
>>>>>>>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0.
>>>>>>>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.*
>>>>>>>>>>
>>>>>>>>>> Interestingly, the fresh content (4 documents) is fed about 5.6
>>>>>>>>>> seconds after the previous batch, not 10 seconds. The second fresh 
>>>>>>>>>> batch
>>>>>>>>>> comes in 6.8 seconds after the previous empty one.
>>>>>>>>>>
>>>>>>>>>> Granted, the log message is printed after iterating over the
>>>>>>>>>> messages which may account for some time differences. But generally,
>>>>>>>>>> looking at the log messages being printed before we iterate, it's 
>>>>>>>>>> still 10
>>>>>>>>>> seconds each time, not 20 which is what batchdurationmillis is 
>>>>>>>>>> currently
>>>>>>>>>> set to.
>>>>>>>>>>
>>>>>>>>>> Code:
>>>>>>>>>>
>>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>>> KafkaUtils.createDirectStream(....);
>>>>>>>>>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>   JavaDStream<String> messageBodies = messages.map(new 
>>>>>>>>>> Function<Tuple2<String,
>>>>>>>>>> String>, String>() {
>>>>>>>>>>       @Override
>>>>>>>>>>       public String call(Tuple2<String, String> tuple2) {
>>>>>>>>>>         return tuple2._2();
>>>>>>>>>>       }
>>>>>>>>>>     });
>>>>>>>>>>
>>>>>>>>>>     messageBodies.foreachRDD(new Function<JavaRDD<String>,
>>>>>>>>>> Void>() {
>>>>>>>>>>       @Override
>>>>>>>>>>       public Void call(JavaRDD<String> rdd) throws Exception {
>>>>>>>>>>
>>>>>>>>>>   ProcessPartitionFunction func = new ProcessPartitionFunction(...);
>>>>>>>>>>         rdd.foreachPartition(func);
>>>>>>>>>>         return null;
>>>>>>>>>>       }
>>>>>>>>>>     });
>>>>>>>>>>
>>>>>>>>>> The log message comes from ProcessPartitionFunction:
>>>>>>>>>>
>>>>>>>>>> public void call(Iterator<String> messageIterator) throws Exception
>>>>>>>>>> {
>>>>>>>>>>     log.info("Starting data partition processing. AppName={},
>>>>>>>>>> topic={}.)...", appName, topic);
>>>>>>>>>>     // ... iterate ...
>>>>>>>>>>     log.info("Finished data partition processing (appName={},
>>>>>>>>>> topic={}). Documents processed: {}.", appName, topic, docCount);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Any ideas? Thanks.
>>>>>>>>>>
>>>>>>>>>> - Dmitry
>>>>>>>>>>
>>>>>>>>>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das <
>>>>>>>>>> t...@databricks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you accidentally recovering from checkpoint files which has
>>>>>>>>>>> 10 second as the batch interval?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <
>>>>>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm seeing an oddity where I initially set the
>>>>>>>>>>>> batchdurationmillis to 1 second and it works fine:
>>>>>>>>>>>>
>>>>>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>>>>>>>>> Durations.milliseconds(batchDurationMillis));
>>>>>>>>>>>>
>>>>>>>>>>>> Then I tried changing the value to 10 seconds. The change
>>>>>>>>>>>> didn't seem to take. I've bounced the Spark workers and the 
>>>>>>>>>>>> consumers and
>>>>>>>>>>>> now I'm seeing RDD's coming in once around 10 seconds (not always 
>>>>>>>>>>>> 10
>>>>>>>>>>>> seconds according to the logs).
>>>>>>>>>>>>
>>>>>>>>>>>> However, now I'm trying to change the value to 20 seconds and
>>>>>>>>>>>> it's just not taking. I've bounced Spark master, workers, and 
>>>>>>>>>>>> consumers and
>>>>>>>>>>>> the value seems "stuck" at 10 seconds.
>>>>>>>>>>>>
>>>>>>>>>>>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>> - Dmitry
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to