The metadata checkpointing interval does not really affect any performance,
so I didnt expose any way to control that interval. The data checkpointing
interval actually affects performance, hence the interval is configurable.



On Thu, Sep 10, 2015 at 5:45 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com
> wrote:

> >> The whole point of checkpointing is to recover the *exact* computation
> where it left off.
>
> That makes sense. We were looking at the metadata checkpointing and the
> data checkpointing, and with data checkpointing, you can specify a
> checkpoint duration value. With the metadata checkpointing, there doesn't
> seem to be a way, which may be the intent but it wasn't clear why there's a
> way to override one duration (for data) but not the other (for metadata).
>
> The basic feel was that we'd want to minimize the number of times Spark
> Streaming is doing the checkpointing I/O. In other words, some sort of
> sweet spot value where we do checkpointing frequently enough without
> performing I/O too frequently. Finding that sweet spot would mean
> experimenting with the checkpoint duration millis but that parameter
> doesn't appear to be exposed in case of metadata checkpointing.
>
>
>
> On Wed, Sep 9, 2015 at 10:39 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> The whole point of checkpointing is to recover the *exact* computation
>> where it left of.
>> If you want any change in the specification of the computation (which
>> includes any intervals), then you cannot recover from checkpoint as it can
>> be an arbitrarily complex issue to deal with changes in the specs,
>> especially because a lot of specs are tied to each other (e.g. checkpoint
>> interval dictates other things like clean up intervals, etc.)
>>
>> Why do you need to change the checkpointing interval at the time of
>> recovery? Trying to understand your usecase.
>>
>>
>> On Wed, Sep 9, 2015 at 12:03 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> >> when you use getOrCreate, and there exists a valid checkpoint, it
>>> will always return the context from the checkpoint and not call the
>>> factory. Simple way to see whats going on is to print something in the
>>> factory to verify whether it is ever called.
>>>
>>> This is probably OK. Seems to explain why we were getting a sticky batch
>>> duration millis value. Once I blew away all the checkpointing directories
>>> and unplugged the data checkpointing (while keeping the metadata
>>> checkpointing) the batch duration millis was no longer sticky.
>>>
>>> So, there doesn't seem to be a way for metadata checkpointing to
>>> override its checkpoint duration millis, is there?  Is the default there
>>> max(batchdurationmillis, 10seconds)?  Is there a way to override this?
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> See inline.
>>>>
>>>> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> What's wrong with creating a checkpointed context??  We WANT
>>>>> checkpointing, first of all.  We therefore WANT the checkpointed context.
>>>>>
>>>>> Second of all, it's not true that we're loading the checkpointed
>>>>> context independent of whether params.isCheckpointed() is true.  I'm
>>>>> quoting the code again:
>>>>>
>>>>> // This is NOT loading a checkpointed context if isCheckpointed() is
>>>>> false.
>>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>>>> params);
>>>>>
>>>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>>>> sparkConf, Parameters params) {
>>>>>     JavaStreamingContextFactory factory = new
>>>>> JavaStreamingContextFactory() {
>>>>>       @Override
>>>>>       public JavaStreamingContext create() {
>>>>>         return createContext(sparkConf, params);
>>>>>       }
>>>>>     };
>>>>>     return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>>> factory);*
>>>>>
>>>> ^^^^^   when you use getOrCreate, and there exists a valid checkpoint,
>>>> it will always return the context from the checkpoint and not call the
>>>> factory. Simple way to see whats going on is to print something in the
>>>> factory to verify whether it is ever called.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>>   }
>>>>>
>>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>>> Parameters params) {
>>>>>     // Create context with the specified batch interval, in
>>>>> milliseconds.
>>>>>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>>>     // Set the checkpoint directory, if we're checkpointing
>>>>>     if (params.isCheckpointed()) {
>>>>>       jssc.checkpoint(params.getCheckpointDir());
>>>>>
>>>>>     }
>>>>> ...............
>>>>> Again, this is *only* calling context.checkpoint() if isCheckpointed()
>>>>> is true.  And we WANT it to be true.
>>>>>
>>>>> What am I missing here?
>>>>>
>>>>>
>>>>>
>>>
>>
>

Reply via email to