Hi Eno, Sounds good to me. The only reason i can think of is if we want to be able to turn it off. Gouzhang - thoughts?
On Fri, 10 Feb 2017 at 10:28 Eno Thereska <eno.there...@gmail.com> wrote: > Question: if checkpointing is so cheap why not do it every commit > interval? That way we can get rid of this extra config variable and just > use the existing commit interval. > > Less tuning knobs. > > Eno > > > On 10 Feb 2017, at 09:27, Damian Guy <damian....@gmail.com> wrote: > > > > Gouzhang, > > > > You've confused me. The failure scenarios you have described are the same > > as they are today. With the checkpoint files in place less data will be > > replayed, so there will be fewer duplicates. > > > > Are you saying you'd like the option to turn checkpointing off? > > > > Thanks, > > Damian > > > > On Thu, 9 Feb 2017 at 21:55 Guozhang Wang <wangg...@gmail.com> wrote: > > > >> Eno, > >> > >> You are right, it is not a new scenario. > >> > >> Thinking a bit more on how we could incorporate KIP-98 in Streams, I > feel > >> that if EOS is turned on inside Streams, then we probably cannot always > >> resume from the checkpointed offsets as it is not guaranteed to be > >> "consistent"; but since EOS may not be turned on by default this is > still > >> worthwhile to add this feature I guess. > >> > >> About the default config values: I think the default value of 5 min is > OK > >> to me, since restoration is usually faster than normal processing > (unless > >> your traffic was really high), about allowing it to be "turned off" > with a > >> non-positive value: I feel there are still values to keep this door > open as > >> in the future if EOS is turned on, people may just want to turn off > >> checkpointing anyways, or there maybe other scenarios that we have not > >> realized yet. On the other hand, I would argue that it is less likely > users > >> mistakenly set it to a non-positive value. > >> > >> Guozhang > >> > >> On Thu, Feb 9, 2017 at 1:03 PM, Eno Thereska <eno.there...@gmail.com> > >> wrote: > >> > >>> Hi Guozhang, > >>> > >>> It seems to me we have the same semantics today. Are you saying there > is > >> a > >>> new failure scenario? > >>> > >>> Thanks, > >>> Eno > >>> > >>>> On 9 Feb 2017, at 19:42, Guozhang Wang <wangg...@gmail.com> wrote: > >>>> > >>>> More specifically, here is my reasoning of failure cases, and would > >> like > >>> to > >>>> get your feedbacks: > >>>> > >>>> *StreamTask* > >>>> > >>>> For stream-task, the committing order is 1) flush state (may send more > >>>> records to changelog in producer), 2) flush producer, 3) commit > >> upstream > >>>> offsets. My understanding is that the writing of the checkpoint file > >> will > >>>> between 2) and 3). So thatt he new order will be 1) flush state, 2) > >> flush > >>>> producer, 3) write checkpoint file (when necessary), 4) commit > upstream > >>>> offsets. > >>>> > >>>> And we have a bunch of "changelog offsets" regarding the state: a) > >> offset > >>>> corresponding to the image of the persistent file, name it point A, b) > >>> log > >>>> end offset, name it offset B, c) checkpoint file recorded offset, name > >> it > >>>> offset C, d) offset corresponding to the current committed upstream > >>> offset, > >>>> name it offset D. > >>>> > >>>> Now let's talk about the failure cases: > >>>> > >>>> If there is a crash between 1) and 2), then A > B = C = D. In this > >> case, > >>> if > >>>> we restore, we will replay no logs at all since B = C while the > >>> persistent > >>>> state file is actually "ahead of time", and we will start reprocessing > >>>> since from the input offset corresponding to D = B < A and hence have > >>> some > >>>> duplicated, *which will be incorrect* if the update logic involve > >> reading > >>>> the state store values as well (i.e. not a blind write), e.g. > >>> aggregations. > >>>> > >>>> If there is a crash between 2) and 3), then A = B > C = D. When we > >>> restore, > >>>> we will replay from C -> B = A, and then start reprocessing from input > >>>> offset corresponding to D < A, and same issue applies as above. > >>>> > >>>> If there is a crash between 3) and 4), then A = B = C > D. When we > >>> restore, > >>>> we will not replay, and then start reprocessing from input offset > >>>> corresponding to D < A, and same issue applies as above. > >>>> > >>>> > >>>> *StandbyTask* > >>>> > >>>> We only do one operation today, which is 1) flush state, I think we > >> will > >>>> add the writing of the checkpoint file after it as step 2). > >>>> > >>>> Failure cases again: offset A -> correspond to the image of the file, > >>>> offset B -> changelog end offset, offset C -> written as in the > >>> checkpoint > >>>> file. > >>>> > >>>> If there is a crash between 1) and 2), then B >= A > C (B >= A because > >> we > >>>> are reading from changelog topic so A will never be greater than B), > >>>> > >>>> 1) and if this task resumes as a standby task, we will resume > >> restoration > >>>> from offset C, and a few duplicates from C -> A will be applied again > >> to > >>>> local state files, then continue from A -> B, *this is OK* since they > >> do > >>>> not incur any computations hence no side effects and are all > >> idempotent. > >>>> > >>>> 2) and if this task resumes as a stream task, we will replay > changelogs > >>>> from C -> A, with duplicated updates, and then from A -> B. This is > >> also > >>> OK > >>>> for the same reason as above. > >>>> > >>>> > >>>> > >>>> So it seems to me that this is not safe for a StreamTask, or maybe the > >>>> writing of the checkpoint file in your mind is different? > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> > >>>> On Thu, Feb 9, 2017 at 11:02 AM, Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>>> > >>>>> A quick question re: `We will add the above config parameter to > >>>>> *StreamsConfig*. During *StreamTask#commit()*, > *StandbyTask#commit()*, > >>>>> and *GlobalUpdateStateTask#flushState()* we will check if the > >>> checkpoint > >>>>> interval has elapsed and write the checkpoint file.` > >>>>> > >>>>> Will the writing of the checkpoint file happen before the flushing of > >>> the > >>>>> state manager? > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Thu, Feb 9, 2017 at 10:48 AM, Matthias J. Sax < > >> matth...@confluent.io > >>>> > >>>>> wrote: > >>>>> > >>>>>> But 5 min means, that we (in the worst case) need to reply data from > >>> the > >>>>>> last 5 minutes to get the store ready. > >>>>>> > >>>>>> So why not go with the min possible value of 30 seconds to speed up > >>> this > >>>>>> process if the impact is negligible anyway? > >>>>>> > >>>>>> What do you gain by being conservative? > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 2/9/17 2:54 AM, Damian Guy wrote: > >>>>>>> Why shouldn't it be 5 minutes? ;-) > >>>>>>> It is a finger in the air number. Based on the testing i did it > >> shows > >>>>>> that > >>>>>>> there isn't much, if any, overhead when checkpointing a single > store > >>> on > >>>>>> the > >>>>>>> commit interval. The default commit interval is 30 seconds, so it > >>> could > >>>>>>> possibly be set to that. However, i'd prefer to be a little > >>>>>> conservative so > >>>>>>> 5 minutes seemed reasonable. > >>>>>>> > >>>>>>> > >>>>>>> On Thu, 9 Feb 2017 at 10:25 Michael Noll <mich...@confluent.io> > >>> wrote: > >>>>>>> > >>>>>>>> Damian, > >>>>>>>> > >>>>>>>> could you elaborate briefly why the default value should be 5 > >>> minutes? > >>>>>>>> What are the considerations, assumptions, etc. that go into > picking > >>>>>> this > >>>>>>>> value? > >>>>>>>> > >>>>>>>> Right now, in the KIP and in this discussion, "5 mins" looks like > a > >>>>>> magic > >>>>>>>> number to me. :-) > >>>>>>>> > >>>>>>>> -Michael > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, Feb 9, 2017 at 11:03 AM, Damian Guy <damian....@gmail.com > > > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> I've ran the SimpleBenchmark with checkpoint on and off to see > >> what > >>>>>> the > >>>>>>>>> impact is. It appears that there is very little impact, if any. > >> The > >>>>>>>> numbers > >>>>>>>>> with checkpointing on actually look better, but that is likely > >>> largely > >>>>>>>> due > >>>>>>>>> to external influences. > >>>>>>>>> > >>>>>>>>> In any case, i'm going to suggest we go with a default checkpoint > >>>>>>>> interval > >>>>>>>>> of 5 minutes. I've update the KIP with this. > >>>>>>>>> > >>>>>>>>> commit every 10 seconds (no checkpoint) > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/34798/287372.83751939767/29.570664980746017 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/35942/278226.0308274442/28.62945857214401 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/34677/288375.58035585546/29.673847218617528 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/34677/288375.58035585546/29.673847218617528 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/31192/320595.02436522185/32.98922800718133 > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> checkpoint every 10 seconds (same as commit interval) > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/36997/270292.185852907/27.81306592426413 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/32087/311652.69423754164/32.069062237043035 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/32895/303997.5680194558/31.281349749202004 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/33476/298721.4720994145/30.738439479029754 > >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec > source+store]: > >>>>>>>>> 10000000/33196/301241.1133871551/30.99771056753826 > >>>>>>>>> > >>>>>>>>> On Wed, 8 Feb 2017 at 09:02 Damian Guy <damian....@gmail.com> > >>> wrote: > >>>>>>>>> > >>>>>>>>>> Matthias, > >>>>>>>>>> > >>>>>>>>>> Fair point. I'll update it the KIP. > >>>>>>>>>> Thanks > >>>>>>>>>> > >>>>>>>>>> On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax < > >> matth...@confluent.io > >>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Damian, > >>>>>>>>>> > >>>>>>>>>> I am not strict about it either. However, if there is no > >> advantage > >>> in > >>>>>>>>>> disabling it, we might not want to allow it. This would have the > >>>>>>>>>> advantage to guard users to accidentally switch it off. > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 2/3/17 2:03 AM, Damian Guy wrote: > >>>>>>>>>>> Hi Matthias, > >>>>>>>>>>> > >>>>>>>>>>> It possibly doesn't make sense to disable it, but then i'm sure > >>>>>>>> someone > >>>>>>>>>>> will come up with a reason they don't want it! > >>>>>>>>>>> I'm happy to change it such that the checkpoint interval must > >> be > > >>>>>> 0. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Damian > >>>>>>>>>>> > >>>>>>>>>>> On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax < > >>> matth...@confluent.io> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Thanks Damian. > >>>>>>>>>>>> > >>>>>>>>>>>> One more question: "Checkpointing is disabled if the > checkpoint > >>>>>>>>> interval > >>>>>>>>>>>> is set to a value <=0." > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Does it make sense to disable check pointing? What's the > >> tradeoff > >>>>>>>>> here? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 2/2/17 1:51 AM, Damian Guy wrote: > >>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the comments. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. TBD - i need to do some performance tests and try and work > >>> out > >>>>>> a > >>>>>>>>>>>>> sensible default. > >>>>>>>>>>>>> 2. Yes, you are correct. It could be a multiple of the > >>>>>>>>>>>> commit.interval.ms. > >>>>>>>>>>>>> But, that would also mean if you change the commit interval - > >>> say > >>>>>>>> you > >>>>>>>>>>>> lower > >>>>>>>>>>>>> it, then you might also need to change the checkpoint setting > >>>>>> (i.e, > >>>>>>>>> you > >>>>>>>>>>>>> still only want to checkpoint every n minutes). > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax < > >>>>>> matth...@confluent.io > >>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the KIP Damian. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I am wondering about two things: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. what should be the default value for the new parameter? > >>>>>>>>>>>>>> 2. why is the new parameter provided in ms? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> About (2): because > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> "the minimum checkpoint interval will be the value of > >>>>>>>>>>>>>> commit.interval.ms. In effect the actual checkpoint > interval > >>>>>> will > >>>>>>>>> be > >>>>>>>>>> a > >>>>>>>>>>>>>> multiple of the commit interval" > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> it might be easier to just use an parameter that is > >>>>>>>>> "number-or-commit > >>>>>>>>>>>>>> intervals". > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 2/1/17 7:29 AM, Damian Guy wrote: > >>>>>>>>>>>>>>> Thanks for the comments Eno. > >>>>>>>>>>>>>>> As for exactly once, i don't believe this matters as we are > >>> just > >>>>>>>>>>>>>> restoring > >>>>>>>>>>>>>>> the change-log, i.e, the result of the aggregations that > >>>>>>>> previously > >>>>>>>>>> ran > >>>>>>>>>>>>>>> etc. So once initialized the state store will be in the > same > >>>>>>>> state > >>>>>>>>> as > >>>>>>>>>>>> it > >>>>>>>>>>>>>>> was before. > >>>>>>>>>>>>>>> Having the checkpoint in a kafka topic is not ideal as the > >>> state > >>>>>>>> is > >>>>>>>>>> per > >>>>>>>>>>>>>>> kafka streams instance. So each instance would need to > start > >>>>>>>> with a > >>>>>>>>>>>>>> unique > >>>>>>>>>>>>>>> id that is persistent. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Damian > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska < > >>>>>> eno.there...@gmail.com > >>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> As a follow up to my previous comment, have you thought > >> about > >>>>>>>>>> writing > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> checkpoint to a topic instead of a local file? That would > >>> have > >>>>>>>> the > >>>>>>>>>>>>>>>> advantage that all metadata continues to be managed by > >> Kafka, > >>>>>> as > >>>>>>>>>> well > >>>>>>>>>>>> as > >>>>>>>>>>>>>>>> fit with EoS. The potential disadvantage would be a slower > >>>>>>>>> latency, > >>>>>>>>>>>>>> however > >>>>>>>>>>>>>>>> if it is periodic as you mention, I'm not sure that would > >> be > >>> a > >>>>>>>>> show > >>>>>>>>>>>>>> stopper. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>> Eno > >>>>>>>>>>>>>>>>> On 1 Feb 2017, at 12:58, Eno Thereska < > >>> eno.there...@gmail.com > >>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks Damian, this is a good idea and will reduce the > >>> restore > >>>>>>>>>> time. > >>>>>>>>>>>>>>>> Looking forward, with exactly once and support for > >>> transactions > >>>>>>>> in > >>>>>>>>>>>>>> Kafka, I > >>>>>>>>>>>>>>>> believe we'll have to add some support for rolling back > >>>>>>>>> checkpoints, > >>>>>>>>>>>>>> e.g., > >>>>>>>>>>>>>>>> when a transaction is aborted. We need to be aware of that > >>> and > >>>>>>>>>> ideally > >>>>>>>>>>>>>>>> anticipate a bit those needs in the KIP. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>> Eno > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 1 Feb 2017, at 10:18, Damian Guy < > >> damian....@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I would like to start the discussion on KIP-116: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>> 116+-+Add+State+Store+Checkpoint+Interval+Configuration > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> Damian > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>> > >>> > >> > >> > >> -- > >> -- Guozhang > >> > >