Hi Eno,

Sounds good to me. The only reason i can think of is if we want to be able
to turn it off.
Gouzhang - thoughts?

On Fri, 10 Feb 2017 at 10:28 Eno Thereska <eno.there...@gmail.com> wrote:

> Question: if checkpointing is so cheap why not do it every commit
> interval? That way we can get rid of this extra config variable and just
> use the existing commit interval.
>
> Less tuning knobs.
>
> Eno
>
> > On 10 Feb 2017, at 09:27, Damian Guy <damian....@gmail.com> wrote:
> >
> > Gouzhang,
> >
> > You've confused me. The failure scenarios you have described are the same
> > as they are today. With the checkpoint files in place less data will be
> > replayed, so there will be fewer duplicates.
> >
> > Are you saying you'd like the option to turn checkpointing off?
> >
> > Thanks,
> > Damian
> >
> > On Thu, 9 Feb 2017 at 21:55 Guozhang Wang <wangg...@gmail.com> wrote:
> >
> >> Eno,
> >>
> >> You are right, it is not a new scenario.
> >>
> >> Thinking a bit more on how we could incorporate KIP-98 in Streams, I
> feel
> >> that if EOS is turned on inside Streams, then we probably cannot always
> >> resume from the checkpointed offsets as it is not guaranteed to be
> >> "consistent"; but since EOS may not be turned on by default this is
> still
> >> worthwhile to add this feature I guess.
> >>
> >> About the default config values: I think the default value of 5 min is
> OK
> >> to me, since restoration is usually faster than normal processing
> (unless
> >> your traffic was really high), about allowing it to be "turned off"
> with a
> >> non-positive value: I feel there are still values to keep this door
> open as
> >> in the future if EOS is turned on, people may just want to turn off
> >> checkpointing anyways, or there maybe other scenarios that we have not
> >> realized yet. On the other hand, I would argue that it is less likely
> users
> >> mistakenly set it to a non-positive value.
> >>
> >> Guozhang
> >>
> >> On Thu, Feb 9, 2017 at 1:03 PM, Eno Thereska <eno.there...@gmail.com>
> >> wrote:
> >>
> >>> Hi Guozhang,
> >>>
> >>> It seems to me we have the same semantics today. Are you saying there
> is
> >> a
> >>> new failure scenario?
> >>>
> >>> Thanks,
> >>> Eno
> >>>
> >>>> On 9 Feb 2017, at 19:42, Guozhang Wang <wangg...@gmail.com> wrote:
> >>>>
> >>>> More specifically, here is my reasoning of failure cases, and would
> >> like
> >>> to
> >>>> get your feedbacks:
> >>>>
> >>>> *StreamTask*
> >>>>
> >>>> For stream-task, the committing order is 1) flush state (may send more
> >>>> records to changelog in producer), 2) flush producer, 3) commit
> >> upstream
> >>>> offsets. My understanding is that the writing of the checkpoint file
> >> will
> >>>> between 2) and 3). So thatt he new order will be 1) flush state, 2)
> >> flush
> >>>> producer, 3) write checkpoint file (when necessary), 4) commit
> upstream
> >>>> offsets.
> >>>>
> >>>> And we have a bunch of "changelog offsets" regarding the state: a)
> >> offset
> >>>> corresponding to the image of the persistent file, name it point A, b)
> >>> log
> >>>> end offset, name it offset B, c) checkpoint file recorded offset, name
> >> it
> >>>> offset C, d) offset corresponding to the current committed upstream
> >>> offset,
> >>>> name it offset D.
> >>>>
> >>>> Now let's talk about the failure cases:
> >>>>
> >>>> If there is a crash between 1) and 2), then A > B = C = D. In this
> >> case,
> >>> if
> >>>> we restore, we will replay no logs at all since B = C while the
> >>> persistent
> >>>> state file is actually "ahead of time", and we will start reprocessing
> >>>> since from the input offset corresponding to D = B < A and hence have
> >>> some
> >>>> duplicated, *which will be incorrect* if the update logic involve
> >> reading
> >>>> the state store values as well (i.e. not a blind write), e.g.
> >>> aggregations.
> >>>>
> >>>> If there is a crash between 2) and 3), then A = B > C = D. When we
> >>> restore,
> >>>> we will replay from C -> B = A, and then start reprocessing from input
> >>>> offset corresponding to D < A, and same issue applies as above.
> >>>>
> >>>> If there is a crash between 3) and 4), then A = B = C > D. When we
> >>> restore,
> >>>> we will not replay, and then start reprocessing from input offset
> >>>> corresponding to D < A, and same issue applies as above.
> >>>>
> >>>>
> >>>> *StandbyTask*
> >>>>
> >>>> We only do one operation today, which is 1) flush state, I think we
> >> will
> >>>> add the writing of the checkpoint file after it as step 2).
> >>>>
> >>>> Failure cases again: offset A -> correspond to the image of the file,
> >>>> offset B -> changelog end offset, offset C -> written as in the
> >>> checkpoint
> >>>> file.
> >>>>
> >>>> If there is a crash between 1) and 2), then B >= A > C (B >= A because
> >> we
> >>>> are reading from changelog topic so A will never be greater than B),
> >>>>
> >>>> 1) and if this task resumes as a standby task, we will resume
> >> restoration
> >>>> from offset C, and a few duplicates from C -> A will be applied again
> >> to
> >>>> local state files, then continue from A -> B, *this is OK* since they
> >> do
> >>>> not incur any computations hence no side effects and are all
> >> idempotent.
> >>>>
> >>>> 2) and if this task resumes as a stream task, we will replay
> changelogs
> >>>> from C -> A, with duplicated updates, and then from A -> B. This is
> >> also
> >>> OK
> >>>> for the same reason as above.
> >>>>
> >>>>
> >>>>
> >>>> So it seems to me that this is not safe for a StreamTask, or maybe the
> >>>> writing of the checkpoint file in your mind is different?
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Feb 9, 2017 at 11:02 AM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> A quick question re: `We will add the above config parameter to
> >>>>> *StreamsConfig*. During *StreamTask#commit()*,
> *StandbyTask#commit()*,
> >>>>> and *GlobalUpdateStateTask#flushState()* we will check if the
> >>> checkpoint
> >>>>> interval has elapsed and write the checkpoint file.`
> >>>>>
> >>>>> Will the writing of the checkpoint file happen before the flushing of
> >>> the
> >>>>> state manager?
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Thu, Feb 9, 2017 at 10:48 AM, Matthias J. Sax <
> >> matth...@confluent.io
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> But 5 min means, that we (in the worst case) need to reply data from
> >>> the
> >>>>>> last 5 minutes to get the store ready.
> >>>>>>
> >>>>>> So why not go with the min possible value of 30 seconds to speed up
> >>> this
> >>>>>> process if the impact is negligible anyway?
> >>>>>>
> >>>>>> What do you gain by being conservative?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 2/9/17 2:54 AM, Damian Guy wrote:
> >>>>>>> Why shouldn't it be 5 minutes? ;-)
> >>>>>>> It is a finger in the air number. Based on the testing i did it
> >> shows
> >>>>>> that
> >>>>>>> there isn't much, if any, overhead when checkpointing a single
> store
> >>> on
> >>>>>> the
> >>>>>>> commit interval. The default commit interval is 30 seconds, so it
> >>> could
> >>>>>>> possibly be set to that. However, i'd prefer to be a little
> >>>>>> conservative so
> >>>>>>> 5 minutes seemed reasonable.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, 9 Feb 2017 at 10:25 Michael Noll <mich...@confluent.io>
> >>> wrote:
> >>>>>>>
> >>>>>>>> Damian,
> >>>>>>>>
> >>>>>>>> could you elaborate briefly why the default value should be 5
> >>> minutes?
> >>>>>>>> What are the considerations, assumptions, etc. that go into
> picking
> >>>>>> this
> >>>>>>>> value?
> >>>>>>>>
> >>>>>>>> Right now, in the KIP and in this discussion, "5 mins" looks like
> a
> >>>>>> magic
> >>>>>>>> number to me. :-)
> >>>>>>>>
> >>>>>>>> -Michael
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Feb 9, 2017 at 11:03 AM, Damian Guy <damian....@gmail.com
> >
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I've ran the SimpleBenchmark with checkpoint on and off to see
> >> what
> >>>>>> the
> >>>>>>>>> impact is. It appears that there is very little impact, if any.
> >> The
> >>>>>>>> numbers
> >>>>>>>>> with checkpointing on actually look better, but that is likely
> >>> largely
> >>>>>>>> due
> >>>>>>>>> to external influences.
> >>>>>>>>>
> >>>>>>>>> In any case, i'm going to suggest we go with a default checkpoint
> >>>>>>>> interval
> >>>>>>>>> of 5 minutes. I've update the KIP with this.
> >>>>>>>>>
> >>>>>>>>> commit every 10 seconds (no checkpoint)
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/34798/287372.83751939767/29.570664980746017
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/35942/278226.0308274442/28.62945857214401
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/34677/288375.58035585546/29.673847218617528
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/34677/288375.58035585546/29.673847218617528
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/31192/320595.02436522185/32.98922800718133
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> checkpoint every 10 seconds (same as commit interval)
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/36997/270292.185852907/27.81306592426413
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/32087/311652.69423754164/32.069062237043035
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/32895/303997.5680194558/31.281349749202004
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/33476/298721.4720994145/30.738439479029754
> >>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
> source+store]:
> >>>>>>>>> 10000000/33196/301241.1133871551/30.99771056753826
> >>>>>>>>>
> >>>>>>>>> On Wed, 8 Feb 2017 at 09:02 Damian Guy <damian....@gmail.com>
> >>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Matthias,
> >>>>>>>>>>
> >>>>>>>>>> Fair point. I'll update it the KIP.
> >>>>>>>>>> Thanks
> >>>>>>>>>>
> >>>>>>>>>> On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax <
> >> matth...@confluent.io
> >>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Damian,
> >>>>>>>>>>
> >>>>>>>>>> I am not strict about it either. However, if there is no
> >> advantage
> >>> in
> >>>>>>>>>> disabling it, we might not want to allow it. This would have the
> >>>>>>>>>> advantage to guard users to accidentally switch it off.
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 2/3/17 2:03 AM, Damian Guy wrote:
> >>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>
> >>>>>>>>>>> It possibly doesn't make sense to disable it, but then i'm sure
> >>>>>>>> someone
> >>>>>>>>>>> will come up with a reason they don't want it!
> >>>>>>>>>>> I'm happy to change it such that the checkpoint interval must
> >> be >
> >>>>>> 0.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Damian
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax <
> >>> matth...@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks Damian.
> >>>>>>>>>>>>
> >>>>>>>>>>>> One more question: "Checkpointing is disabled if the
> checkpoint
> >>>>>>>>> interval
> >>>>>>>>>>>> is set to a value <=0."
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Does it make sense to disable check pointing? What's the
> >> tradeoff
> >>>>>>>>> here?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 2/2/17 1:51 AM, Damian Guy wrote:
> >>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the comments.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. TBD - i need to do some performance tests and try and work
> >>> out
> >>>>>> a
> >>>>>>>>>>>>> sensible default.
> >>>>>>>>>>>>> 2. Yes, you are correct. It could be a multiple of the
> >>>>>>>>>>>> commit.interval.ms.
> >>>>>>>>>>>>> But, that would also mean if you change the commit interval -
> >>> say
> >>>>>>>> you
> >>>>>>>>>>>> lower
> >>>>>>>>>>>>> it, then you might also need to change the checkpoint setting
> >>>>>> (i.e,
> >>>>>>>>> you
> >>>>>>>>>>>>> still only want to checkpoint every n minutes).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax <
> >>>>>> matth...@confluent.io
> >>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the KIP Damian.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I am wondering about two things:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. what should be the default value for the new parameter?
> >>>>>>>>>>>>>> 2. why is the new parameter provided in ms?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About (2): because
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> "the minimum checkpoint interval will be the value of
> >>>>>>>>>>>>>> commit.interval.ms. In effect the actual checkpoint
> interval
> >>>>>> will
> >>>>>>>>> be
> >>>>>>>>>> a
> >>>>>>>>>>>>>> multiple of the commit interval"
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> it might be easier to just use an parameter that is
> >>>>>>>>> "number-or-commit
> >>>>>>>>>>>>>> intervals".
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 2/1/17 7:29 AM, Damian Guy wrote:
> >>>>>>>>>>>>>>> Thanks for the comments Eno.
> >>>>>>>>>>>>>>> As for exactly once, i don't believe this matters as we are
> >>> just
> >>>>>>>>>>>>>> restoring
> >>>>>>>>>>>>>>> the change-log, i.e, the result of the aggregations that
> >>>>>>>> previously
> >>>>>>>>>> ran
> >>>>>>>>>>>>>>> etc. So once initialized the state store will be in the
> same
> >>>>>>>> state
> >>>>>>>>> as
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>> was before.
> >>>>>>>>>>>>>>> Having the checkpoint in a kafka topic is not ideal as the
> >>> state
> >>>>>>>> is
> >>>>>>>>>> per
> >>>>>>>>>>>>>>> kafka streams instance. So each instance would need to
> start
> >>>>>>>> with a
> >>>>>>>>>>>>>> unique
> >>>>>>>>>>>>>>> id that is persistent.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Damian
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska <
> >>>>>> eno.there...@gmail.com
> >>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> As a follow up to my previous comment, have you thought
> >> about
> >>>>>>>>>> writing
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> checkpoint to a topic instead of a local file? That would
> >>> have
> >>>>>>>> the
> >>>>>>>>>>>>>>>> advantage that all metadata continues to be managed by
> >> Kafka,
> >>>>>> as
> >>>>>>>>>> well
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>>>> fit with EoS. The potential disadvantage would be a slower
> >>>>>>>>> latency,
> >>>>>>>>>>>>>> however
> >>>>>>>>>>>>>>>> if it is periodic as you mention, I'm not sure that would
> >> be
> >>> a
> >>>>>>>>> show
> >>>>>>>>>>>>>> stopper.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>> Eno
> >>>>>>>>>>>>>>>>> On 1 Feb 2017, at 12:58, Eno Thereska <
> >>> eno.there...@gmail.com
> >>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks Damian, this is a good idea and will reduce the
> >>> restore
> >>>>>>>>>> time.
> >>>>>>>>>>>>>>>> Looking forward, with exactly once and support for
> >>> transactions
> >>>>>>>> in
> >>>>>>>>>>>>>> Kafka, I
> >>>>>>>>>>>>>>>> believe we'll have to add some support for rolling back
> >>>>>>>>> checkpoints,
> >>>>>>>>>>>>>> e.g.,
> >>>>>>>>>>>>>>>> when a transaction is aborted. We need to be aware of that
> >>> and
> >>>>>>>>>> ideally
> >>>>>>>>>>>>>>>> anticipate a bit those needs in the KIP.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>> Eno
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 1 Feb 2017, at 10:18, Damian Guy <
> >> damian....@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I would like to start the discussion on KIP-116:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>> 116+-+Add+State+Store+Checkpoint+Interval+Configuration
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> Damian
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>
>

Reply via email to