The metadata checkpointing interval does not really affect any performance, so I didnt expose any way to control that interval. The data checkpointing interval actually affects performance, hence the interval is configurable.
On Thu, Sep 10, 2015 at 5:45 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com > wrote: > >> The whole point of checkpointing is to recover the *exact* computation > where it left off. > > That makes sense. We were looking at the metadata checkpointing and the > data checkpointing, and with data checkpointing, you can specify a > checkpoint duration value. With the metadata checkpointing, there doesn't > seem to be a way, which may be the intent but it wasn't clear why there's a > way to override one duration (for data) but not the other (for metadata). > > The basic feel was that we'd want to minimize the number of times Spark > Streaming is doing the checkpointing I/O. In other words, some sort of > sweet spot value where we do checkpointing frequently enough without > performing I/O too frequently. Finding that sweet spot would mean > experimenting with the checkpoint duration millis but that parameter > doesn't appear to be exposed in case of metadata checkpointing. > > > > On Wed, Sep 9, 2015 at 10:39 PM, Tathagata Das <t...@databricks.com> > wrote: > >> The whole point of checkpointing is to recover the *exact* computation >> where it left of. >> If you want any change in the specification of the computation (which >> includes any intervals), then you cannot recover from checkpoint as it can >> be an arbitrarily complex issue to deal with changes in the specs, >> especially because a lot of specs are tied to each other (e.g. checkpoint >> interval dictates other things like clean up intervals, etc.) >> >> Why do you need to change the checkpointing interval at the time of >> recovery? Trying to understand your usecase. >> >> >> On Wed, Sep 9, 2015 at 12:03 PM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> >> when you use getOrCreate, and there exists a valid checkpoint, it >>> will always return the context from the checkpoint and not call the >>> factory. Simple way to see whats going on is to print something in the >>> factory to verify whether it is ever called. >>> >>> This is probably OK. Seems to explain why we were getting a sticky batch >>> duration millis value. Once I blew away all the checkpointing directories >>> and unplugged the data checkpointing (while keeping the metadata >>> checkpointing) the batch duration millis was no longer sticky. >>> >>> So, there doesn't seem to be a way for metadata checkpointing to >>> override its checkpoint duration millis, is there? Is the default there >>> max(batchdurationmillis, 10seconds)? Is there a way to override this? >>> Thanks. >>> >>> >>> >>> >>> >>> On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> >>>> >>>> See inline. >>>> >>>> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg < >>>> dgoldenberg...@gmail.com> wrote: >>>> >>>>> What's wrong with creating a checkpointed context?? We WANT >>>>> checkpointing, first of all. We therefore WANT the checkpointed context. >>>>> >>>>> Second of all, it's not true that we're loading the checkpointed >>>>> context independent of whether params.isCheckpointed() is true. I'm >>>>> quoting the code again: >>>>> >>>>> // This is NOT loading a checkpointed context if isCheckpointed() is >>>>> false. >>>>> JavaStreamingContext jssc = params.isCheckpointed() ? >>>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf, >>>>> params); >>>>> >>>>> private JavaStreamingContext createCheckpointedContext(SparkConf >>>>> sparkConf, Parameters params) { >>>>> JavaStreamingContextFactory factory = new >>>>> JavaStreamingContextFactory() { >>>>> @Override >>>>> public JavaStreamingContext create() { >>>>> return createContext(sparkConf, params); >>>>> } >>>>> }; >>>>> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(), >>>>> factory);* >>>>> >>>> ^^^^^ when you use getOrCreate, and there exists a valid checkpoint, >>>> it will always return the context from the checkpoint and not call the >>>> factory. Simple way to see whats going on is to print something in the >>>> factory to verify whether it is ever called. >>>> >>>> >>>> >>>> >>>> >>>>> } >>>>> >>>>> private JavaStreamingContext createContext(SparkConf sparkConf, >>>>> Parameters params) { >>>>> // Create context with the specified batch interval, in >>>>> milliseconds. >>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >>>>> Durations.milliseconds(params.getBatchDurationMillis())); >>>>> // Set the checkpoint directory, if we're checkpointing >>>>> if (params.isCheckpointed()) { >>>>> jssc.checkpoint(params.getCheckpointDir()); >>>>> >>>>> } >>>>> ............... >>>>> Again, this is *only* calling context.checkpoint() if isCheckpointed() >>>>> is true. And we WANT it to be true. >>>>> >>>>> What am I missing here? >>>>> >>>>> >>>>> >>> >> >