>> The whole point of checkpointing is to recover the *exact* computation where it left off.
That makes sense. We were looking at the metadata checkpointing and the data checkpointing, and with data checkpointing, you can specify a checkpoint duration value. With the metadata checkpointing, there doesn't seem to be a way, which may be the intent but it wasn't clear why there's a way to override one duration (for data) but not the other (for metadata). The basic feel was that we'd want to minimize the number of times Spark Streaming is doing the checkpointing I/O. In other words, some sort of sweet spot value where we do checkpointing frequently enough without performing I/O too frequently. Finding that sweet spot would mean experimenting with the checkpoint duration millis but that parameter doesn't appear to be exposed in case of metadata checkpointing. On Wed, Sep 9, 2015 at 10:39 PM, Tathagata Das <t...@databricks.com> wrote: > The whole point of checkpointing is to recover the *exact* computation > where it left of. > If you want any change in the specification of the computation (which > includes any intervals), then you cannot recover from checkpoint as it can > be an arbitrarily complex issue to deal with changes in the specs, > especially because a lot of specs are tied to each other (e.g. checkpoint > interval dictates other things like clean up intervals, etc.) > > Why do you need to change the checkpointing interval at the time of > recovery? Trying to understand your usecase. > > > On Wed, Sep 9, 2015 at 12:03 PM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> >> when you use getOrCreate, and there exists a valid checkpoint, it will >> always return the context from the checkpoint and not call the factory. >> Simple way to see whats going on is to print something in the factory to >> verify whether it is ever called. >> >> This is probably OK. Seems to explain why we were getting a sticky batch >> duration millis value. Once I blew away all the checkpointing directories >> and unplugged the data checkpointing (while keeping the metadata >> checkpointing) the batch duration millis was no longer sticky. >> >> So, there doesn't seem to be a way for metadata checkpointing to override >> its checkpoint duration millis, is there? Is the default there >> max(batchdurationmillis, 10seconds)? Is there a way to override this? >> Thanks. >> >> >> >> >> >> On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> >>> >>> See inline. >>> >>> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> >>>> What's wrong with creating a checkpointed context?? We WANT >>>> checkpointing, first of all. We therefore WANT the checkpointed context. >>>> >>>> Second of all, it's not true that we're loading the checkpointed >>>> context independent of whether params.isCheckpointed() is true. I'm >>>> quoting the code again: >>>> >>>> // This is NOT loading a checkpointed context if isCheckpointed() is >>>> false. >>>> JavaStreamingContext jssc = params.isCheckpointed() ? >>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf, >>>> params); >>>> >>>> private JavaStreamingContext createCheckpointedContext(SparkConf >>>> sparkConf, Parameters params) { >>>> JavaStreamingContextFactory factory = new >>>> JavaStreamingContextFactory() { >>>> @Override >>>> public JavaStreamingContext create() { >>>> return createContext(sparkConf, params); >>>> } >>>> }; >>>> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(), >>>> factory);* >>>> >>> ^^^^^ when you use getOrCreate, and there exists a valid checkpoint, >>> it will always return the context from the checkpoint and not call the >>> factory. Simple way to see whats going on is to print something in the >>> factory to verify whether it is ever called. >>> >>> >>> >>> >>> >>>> } >>>> >>>> private JavaStreamingContext createContext(SparkConf sparkConf, >>>> Parameters params) { >>>> // Create context with the specified batch interval, in >>>> milliseconds. >>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>> Durations.milliseconds(params.getBatchDurationMillis())); >>>> // Set the checkpoint directory, if we're checkpointing >>>> if (params.isCheckpointed()) { >>>> jssc.checkpoint(params.getCheckpointDir()); >>>> >>>> } >>>> ............... >>>> Again, this is *only* calling context.checkpoint() if isCheckpointed() >>>> is true. And we WANT it to be true. >>>> >>>> What am I missing here? >>>> >>>> >>>> >> >