Hi Guozhang, It seems to me we have the same semantics today. Are you saying there is a new failure scenario?
Thanks, Eno > On 9 Feb 2017, at 19:42, Guozhang Wang <wangg...@gmail.com> wrote: > > More specifically, here is my reasoning of failure cases, and would like to > get your feedbacks: > > *StreamTask* > > For stream-task, the committing order is 1) flush state (may send more > records to changelog in producer), 2) flush producer, 3) commit upstream > offsets. My understanding is that the writing of the checkpoint file will > between 2) and 3). So thatt he new order will be 1) flush state, 2) flush > producer, 3) write checkpoint file (when necessary), 4) commit upstream > offsets. > > And we have a bunch of "changelog offsets" regarding the state: a) offset > corresponding to the image of the persistent file, name it point A, b) log > end offset, name it offset B, c) checkpoint file recorded offset, name it > offset C, d) offset corresponding to the current committed upstream offset, > name it offset D. > > Now let's talk about the failure cases: > > If there is a crash between 1) and 2), then A > B = C = D. In this case, if > we restore, we will replay no logs at all since B = C while the persistent > state file is actually "ahead of time", and we will start reprocessing > since from the input offset corresponding to D = B < A and hence have some > duplicated, *which will be incorrect* if the update logic involve reading > the state store values as well (i.e. not a blind write), e.g. aggregations. > > If there is a crash between 2) and 3), then A = B > C = D. When we restore, > we will replay from C -> B = A, and then start reprocessing from input > offset corresponding to D < A, and same issue applies as above. > > If there is a crash between 3) and 4), then A = B = C > D. When we restore, > we will not replay, and then start reprocessing from input offset > corresponding to D < A, and same issue applies as above. > > > *StandbyTask* > > We only do one operation today, which is 1) flush state, I think we will > add the writing of the checkpoint file after it as step 2). > > Failure cases again: offset A -> correspond to the image of the file, > offset B -> changelog end offset, offset C -> written as in the checkpoint > file. > > If there is a crash between 1) and 2), then B >= A > C (B >= A because we > are reading from changelog topic so A will never be greater than B), > > 1) and if this task resumes as a standby task, we will resume restoration > from offset C, and a few duplicates from C -> A will be applied again to > local state files, then continue from A -> B, *this is OK* since they do > not incur any computations hence no side effects and are all idempotent. > > 2) and if this task resumes as a stream task, we will replay changelogs > from C -> A, with duplicated updates, and then from A -> B. This is also OK > for the same reason as above. > > > > So it seems to me that this is not safe for a StreamTask, or maybe the > writing of the checkpoint file in your mind is different? > > > Guozhang > > > > On Thu, Feb 9, 2017 at 11:02 AM, Guozhang Wang <wangg...@gmail.com> wrote: > >> A quick question re: `We will add the above config parameter to >> *StreamsConfig*. During *StreamTask#commit()*, *StandbyTask#commit()*, >> and *GlobalUpdateStateTask#flushState()* we will check if the checkpoint >> interval has elapsed and write the checkpoint file.` >> >> Will the writing of the checkpoint file happen before the flushing of the >> state manager? >> >> Guozhang >> >> >> On Thu, Feb 9, 2017 at 10:48 AM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> But 5 min means, that we (in the worst case) need to reply data from the >>> last 5 minutes to get the store ready. >>> >>> So why not go with the min possible value of 30 seconds to speed up this >>> process if the impact is negligible anyway? >>> >>> What do you gain by being conservative? >>> >>> >>> -Matthias >>> >>> On 2/9/17 2:54 AM, Damian Guy wrote: >>>> Why shouldn't it be 5 minutes? ;-) >>>> It is a finger in the air number. Based on the testing i did it shows >>> that >>>> there isn't much, if any, overhead when checkpointing a single store on >>> the >>>> commit interval. The default commit interval is 30 seconds, so it could >>>> possibly be set to that. However, i'd prefer to be a little >>> conservative so >>>> 5 minutes seemed reasonable. >>>> >>>> >>>> On Thu, 9 Feb 2017 at 10:25 Michael Noll <mich...@confluent.io> wrote: >>>> >>>>> Damian, >>>>> >>>>> could you elaborate briefly why the default value should be 5 minutes? >>>>> What are the considerations, assumptions, etc. that go into picking >>> this >>>>> value? >>>>> >>>>> Right now, in the KIP and in this discussion, "5 mins" looks like a >>> magic >>>>> number to me. :-) >>>>> >>>>> -Michael >>>>> >>>>> >>>>> >>>>> On Thu, Feb 9, 2017 at 11:03 AM, Damian Guy <damian....@gmail.com> >>> wrote: >>>>> >>>>>> I've ran the SimpleBenchmark with checkpoint on and off to see what >>> the >>>>>> impact is. It appears that there is very little impact, if any. The >>>>> numbers >>>>>> with checkpointing on actually look better, but that is likely largely >>>>> due >>>>>> to external influences. >>>>>> >>>>>> In any case, i'm going to suggest we go with a default checkpoint >>>>> interval >>>>>> of 5 minutes. I've update the KIP with this. >>>>>> >>>>>> commit every 10 seconds (no checkpoint) >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/34798/287372.83751939767/29.570664980746017 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/35942/278226.0308274442/28.62945857214401 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/34677/288375.58035585546/29.673847218617528 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/34677/288375.58035585546/29.673847218617528 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/31192/320595.02436522185/32.98922800718133 >>>>>> >>>>>> >>>>>> checkpoint every 10 seconds (same as commit interval) >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/36997/270292.185852907/27.81306592426413 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/32087/311652.69423754164/32.069062237043035 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/32895/303997.5680194558/31.281349749202004 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/33476/298721.4720994145/30.738439479029754 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >>>>>> 10000000/33196/301241.1133871551/30.99771056753826 >>>>>> >>>>>> On Wed, 8 Feb 2017 at 09:02 Damian Guy <damian....@gmail.com> wrote: >>>>>> >>>>>>> Matthias, >>>>>>> >>>>>>> Fair point. I'll update it the KIP. >>>>>>> Thanks >>>>>>> >>>>>>> On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax <matth...@confluent.io> >>>>>> wrote: >>>>>>> >>>>>>> Damian, >>>>>>> >>>>>>> I am not strict about it either. However, if there is no advantage in >>>>>>> disabling it, we might not want to allow it. This would have the >>>>>>> advantage to guard users to accidentally switch it off. >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 2/3/17 2:03 AM, Damian Guy wrote: >>>>>>>> Hi Matthias, >>>>>>>> >>>>>>>> It possibly doesn't make sense to disable it, but then i'm sure >>>>> someone >>>>>>>> will come up with a reason they don't want it! >>>>>>>> I'm happy to change it such that the checkpoint interval must be > >>> 0. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Damian >>>>>>>> >>>>>>>> On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax <matth...@confluent.io> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Damian. >>>>>>>>> >>>>>>>>> One more question: "Checkpointing is disabled if the checkpoint >>>>>> interval >>>>>>>>> is set to a value <=0." >>>>>>>>> >>>>>>>>> >>>>>>>>> Does it make sense to disable check pointing? What's the tradeoff >>>>>> here? >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 2/2/17 1:51 AM, Damian Guy wrote: >>>>>>>>>> Hi Matthias, >>>>>>>>>> >>>>>>>>>> Thanks for the comments. >>>>>>>>>> >>>>>>>>>> 1. TBD - i need to do some performance tests and try and work out >>> a >>>>>>>>>> sensible default. >>>>>>>>>> 2. Yes, you are correct. It could be a multiple of the >>>>>>>>> commit.interval.ms. >>>>>>>>>> But, that would also mean if you change the commit interval - say >>>>> you >>>>>>>>> lower >>>>>>>>>> it, then you might also need to change the checkpoint setting >>> (i.e, >>>>>> you >>>>>>>>>> still only want to checkpoint every n minutes). >>>>>>>>>> >>>>>>>>>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax < >>> matth...@confluent.io >>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for the KIP Damian. >>>>>>>>>>> >>>>>>>>>>> I am wondering about two things: >>>>>>>>>>> >>>>>>>>>>> 1. what should be the default value for the new parameter? >>>>>>>>>>> 2. why is the new parameter provided in ms? >>>>>>>>>>> >>>>>>>>>>> About (2): because >>>>>>>>>>> >>>>>>>>>>> "the minimum checkpoint interval will be the value of >>>>>>>>>>> commit.interval.ms. In effect the actual checkpoint interval >>> will >>>>>> be >>>>>>> a >>>>>>>>>>> multiple of the commit interval" >>>>>>>>>>> >>>>>>>>>>> it might be easier to just use an parameter that is >>>>>> "number-or-commit >>>>>>>>>>> intervals". >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 2/1/17 7:29 AM, Damian Guy wrote: >>>>>>>>>>>> Thanks for the comments Eno. >>>>>>>>>>>> As for exactly once, i don't believe this matters as we are just >>>>>>>>>>> restoring >>>>>>>>>>>> the change-log, i.e, the result of the aggregations that >>>>> previously >>>>>>> ran >>>>>>>>>>>> etc. So once initialized the state store will be in the same >>>>> state >>>>>> as >>>>>>>>> it >>>>>>>>>>>> was before. >>>>>>>>>>>> Having the checkpoint in a kafka topic is not ideal as the state >>>>> is >>>>>>> per >>>>>>>>>>>> kafka streams instance. So each instance would need to start >>>>> with a >>>>>>>>>>> unique >>>>>>>>>>>> id that is persistent. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Damian >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska < >>> eno.there...@gmail.com >>>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> As a follow up to my previous comment, have you thought about >>>>>>> writing >>>>>>>>>>> the >>>>>>>>>>>>> checkpoint to a topic instead of a local file? That would have >>>>> the >>>>>>>>>>>>> advantage that all metadata continues to be managed by Kafka, >>> as >>>>>>> well >>>>>>>>> as >>>>>>>>>>>>> fit with EoS. The potential disadvantage would be a slower >>>>>> latency, >>>>>>>>>>> however >>>>>>>>>>>>> if it is periodic as you mention, I'm not sure that would be a >>>>>> show >>>>>>>>>>> stopper. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Eno >>>>>>>>>>>>>> On 1 Feb 2017, at 12:58, Eno Thereska <eno.there...@gmail.com >>>> >>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Damian, this is a good idea and will reduce the restore >>>>>>> time. >>>>>>>>>>>>> Looking forward, with exactly once and support for transactions >>>>> in >>>>>>>>>>> Kafka, I >>>>>>>>>>>>> believe we'll have to add some support for rolling back >>>>>> checkpoints, >>>>>>>>>>> e.g., >>>>>>>>>>>>> when a transaction is aborted. We need to be aware of that and >>>>>>> ideally >>>>>>>>>>>>> anticipate a bit those needs in the KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Eno >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 1 Feb 2017, at 10:18, Damian Guy <damian....@gmail.com> >>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I would like to start the discussion on KIP-116: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>> 116+-+Add+State+Store+Checkpoint+Interval+Configuration >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >> >> -- >> -- Guozhang >> > > > > -- > -- Guozhang