Hi Guozhang,

It seems to me we have the same semantics today. Are you saying there is a new 
failure scenario? 

Thanks,
Eno

> On 9 Feb 2017, at 19:42, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> More specifically, here is my reasoning of failure cases, and would like to
> get your feedbacks:
> 
> *StreamTask*
> 
> For stream-task, the committing order is 1) flush state (may send more
> records to changelog in producer), 2) flush producer, 3) commit upstream
> offsets. My understanding is that the writing of the checkpoint file will
> between 2) and 3). So thatt he new order will be 1) flush state, 2) flush
> producer, 3) write checkpoint file (when necessary), 4) commit upstream
> offsets.
> 
> And we have a bunch of "changelog offsets" regarding the state: a) offset
> corresponding to the image of the persistent file, name it point A, b) log
> end offset, name it offset B, c) checkpoint file recorded offset, name it
> offset C, d) offset corresponding to the current committed upstream offset,
> name it offset D.
> 
> Now let's talk about the failure cases:
> 
> If there is a crash between 1) and 2), then A > B = C = D. In this case, if
> we restore, we will replay no logs at all since B = C while the persistent
> state file is actually "ahead of time", and we will start reprocessing
> since from the input offset corresponding to D = B < A and hence have some
> duplicated, *which will be incorrect* if the update logic involve reading
> the state store values as well (i.e. not a blind write), e.g. aggregations.
> 
> If there is a crash between 2) and 3), then A = B > C = D. When we restore,
> we will replay from C -> B = A, and then start reprocessing from input
> offset corresponding to D < A, and same issue applies as above.
> 
> If there is a crash between 3) and 4), then A = B = C > D. When we restore,
> we will not replay, and then start reprocessing from input offset
> corresponding to D < A, and same issue applies as above.
> 
> 
> *StandbyTask*
> 
> We only do one operation today, which is 1) flush state, I think we will
> add the writing of the checkpoint file after it as step 2).
> 
> Failure cases again: offset A -> correspond to the image of the file,
> offset B -> changelog end offset, offset C -> written as in the checkpoint
> file.
> 
> If there is a crash between 1) and 2), then B >= A > C (B >= A because we
> are reading from changelog topic so A will never be greater than B),
> 
> 1) and if this task resumes as a standby task, we will resume restoration
> from offset C, and a few duplicates from C -> A will be applied again to
> local state files, then continue from A -> B, *this is OK* since they do
> not incur any computations hence no side effects and are all idempotent.
> 
> 2) and if this task resumes as a stream task, we will replay changelogs
> from C -> A, with duplicated updates, and then from A -> B. This is also OK
> for the same reason as above.
> 
> 
> 
> So it seems to me that this is not safe for a StreamTask, or maybe the
> writing of the checkpoint file in your mind is different?
> 
> 
> Guozhang
> 
> 
> 
> On Thu, Feb 9, 2017 at 11:02 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> A quick question re: `We will add the above config parameter to
>> *StreamsConfig*. During *StreamTask#commit()*, *StandbyTask#commit()*,
>> and *GlobalUpdateStateTask#flushState()* we will check if the checkpoint
>> interval has elapsed and write the checkpoint file.`
>> 
>> Will the writing of the checkpoint file happen before the flushing of the
>> state manager?
>> 
>> Guozhang
>> 
>> 
>> On Thu, Feb 9, 2017 at 10:48 AM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>> 
>>> But 5 min means, that we (in the worst case) need to reply data from the
>>> last 5 minutes to get the store ready.
>>> 
>>> So why not go with the min possible value of 30 seconds to speed up this
>>> process if the impact is negligible anyway?
>>> 
>>> What do you gain by being conservative?
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 2/9/17 2:54 AM, Damian Guy wrote:
>>>> Why shouldn't it be 5 minutes? ;-)
>>>> It is a finger in the air number. Based on the testing i did it shows
>>> that
>>>> there isn't much, if any, overhead when checkpointing a single store on
>>> the
>>>> commit interval. The default commit interval is 30 seconds, so it could
>>>> possibly be set to that. However, i'd prefer to be a little
>>> conservative so
>>>> 5 minutes seemed reasonable.
>>>> 
>>>> 
>>>> On Thu, 9 Feb 2017 at 10:25 Michael Noll <mich...@confluent.io> wrote:
>>>> 
>>>>> Damian,
>>>>> 
>>>>> could you elaborate briefly why the default value should be 5 minutes?
>>>>> What are the considerations, assumptions, etc. that go into picking
>>> this
>>>>> value?
>>>>> 
>>>>> Right now, in the KIP and in this discussion, "5 mins" looks like a
>>> magic
>>>>> number to me. :-)
>>>>> 
>>>>> -Michael
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Feb 9, 2017 at 11:03 AM, Damian Guy <damian....@gmail.com>
>>> wrote:
>>>>> 
>>>>>> I've ran the SimpleBenchmark with checkpoint on and off to see what
>>> the
>>>>>> impact is. It appears that there is very little impact, if any. The
>>>>> numbers
>>>>>> with checkpointing on actually look better, but that is likely largely
>>>>> due
>>>>>> to external influences.
>>>>>> 
>>>>>> In any case, i'm going to suggest we go with a default checkpoint
>>>>> interval
>>>>>> of 5 minutes. I've update the KIP with this.
>>>>>> 
>>>>>> commit every 10 seconds (no checkpoint)
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/34798/287372.83751939767/29.570664980746017
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/35942/278226.0308274442/28.62945857214401
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/34677/288375.58035585546/29.673847218617528
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/34677/288375.58035585546/29.673847218617528
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/31192/320595.02436522185/32.98922800718133
>>>>>> 
>>>>>> 
>>>>>> checkpoint every 10 seconds (same as commit interval)
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/36997/270292.185852907/27.81306592426413
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/32087/311652.69423754164/32.069062237043035
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/32895/303997.5680194558/31.281349749202004
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/33476/298721.4720994145/30.738439479029754
>>>>>> Streams Performance [records/latency/rec-sec/MB-sec source+store]:
>>>>>> 10000000/33196/301241.1133871551/30.99771056753826
>>>>>> 
>>>>>> On Wed, 8 Feb 2017 at 09:02 Damian Guy <damian....@gmail.com> wrote:
>>>>>> 
>>>>>>> Matthias,
>>>>>>> 
>>>>>>> Fair point. I'll update it the KIP.
>>>>>>> Thanks
>>>>>>> 
>>>>>>> On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax <matth...@confluent.io>
>>>>>> wrote:
>>>>>>> 
>>>>>>> Damian,
>>>>>>> 
>>>>>>> I am not strict about it either. However, if there is no advantage in
>>>>>>> disabling it, we might not want to allow it. This would have the
>>>>>>> advantage to guard users to accidentally switch it off.
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> 
>>>>>>> On 2/3/17 2:03 AM, Damian Guy wrote:
>>>>>>>> Hi Matthias,
>>>>>>>> 
>>>>>>>> It possibly doesn't make sense to disable it, but then i'm sure
>>>>> someone
>>>>>>>> will come up with a reason they don't want it!
>>>>>>>> I'm happy to change it such that the checkpoint interval must be >
>>> 0.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Damian
>>>>>>>> 
>>>>>>>> On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax <matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thanks Damian.
>>>>>>>>> 
>>>>>>>>> One more question: "Checkpointing is disabled if the checkpoint
>>>>>> interval
>>>>>>>>> is set to a value <=0."
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Does it make sense to disable check pointing? What's the tradeoff
>>>>>> here?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -Matthias
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 2/2/17 1:51 AM, Damian Guy wrote:
>>>>>>>>>> Hi Matthias,
>>>>>>>>>> 
>>>>>>>>>> Thanks for the comments.
>>>>>>>>>> 
>>>>>>>>>> 1. TBD - i need to do some performance tests and try and work out
>>> a
>>>>>>>>>> sensible default.
>>>>>>>>>> 2. Yes, you are correct. It could be a multiple of the
>>>>>>>>> commit.interval.ms.
>>>>>>>>>> But, that would also mean if you change the commit interval - say
>>>>> you
>>>>>>>>> lower
>>>>>>>>>> it, then you might also need to change the checkpoint setting
>>> (i.e,
>>>>>> you
>>>>>>>>>> still only want to checkpoint every n minutes).
>>>>>>>>>> 
>>>>>>>>>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax <
>>> matth...@confluent.io
>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Thanks for the KIP Damian.
>>>>>>>>>>> 
>>>>>>>>>>> I am wondering about two things:
>>>>>>>>>>> 
>>>>>>>>>>> 1. what should be the default value for the new parameter?
>>>>>>>>>>> 2. why is the new parameter provided in ms?
>>>>>>>>>>> 
>>>>>>>>>>> About (2): because
>>>>>>>>>>> 
>>>>>>>>>>> "the minimum checkpoint interval will be the value of
>>>>>>>>>>> commit.interval.ms. In effect the actual checkpoint interval
>>> will
>>>>>> be
>>>>>>> a
>>>>>>>>>>> multiple of the commit interval"
>>>>>>>>>>> 
>>>>>>>>>>> it might be easier to just use an parameter that is
>>>>>> "number-or-commit
>>>>>>>>>>> intervals".
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -Matthias
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On 2/1/17 7:29 AM, Damian Guy wrote:
>>>>>>>>>>>> Thanks for the comments Eno.
>>>>>>>>>>>> As for exactly once, i don't believe this matters as we are just
>>>>>>>>>>> restoring
>>>>>>>>>>>> the change-log, i.e, the result of the aggregations that
>>>>> previously
>>>>>>> ran
>>>>>>>>>>>> etc. So once initialized the state store will be in the same
>>>>> state
>>>>>> as
>>>>>>>>> it
>>>>>>>>>>>> was before.
>>>>>>>>>>>> Having the checkpoint in a kafka topic is not ideal as the state
>>>>> is
>>>>>>> per
>>>>>>>>>>>> kafka streams instance. So each instance would need to start
>>>>> with a
>>>>>>>>>>> unique
>>>>>>>>>>>> id that is persistent.
>>>>>>>>>>>> 
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Damian
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska <
>>> eno.there...@gmail.com
>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> As a follow up to my previous comment, have you thought about
>>>>>>> writing
>>>>>>>>>>> the
>>>>>>>>>>>>> checkpoint to a topic instead of a local file? That would have
>>>>> the
>>>>>>>>>>>>> advantage that all metadata continues to be managed by Kafka,
>>> as
>>>>>>> well
>>>>>>>>> as
>>>>>>>>>>>>> fit with EoS. The potential disadvantage would be a slower
>>>>>> latency,
>>>>>>>>>>> however
>>>>>>>>>>>>> if it is periodic as you mention, I'm not sure that would be a
>>>>>> show
>>>>>>>>>>> stopper.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>> On 1 Feb 2017, at 12:58, Eno Thereska <eno.there...@gmail.com
>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks Damian, this is a good idea and will reduce the restore
>>>>>>> time.
>>>>>>>>>>>>> Looking forward, with exactly once and support for transactions
>>>>> in
>>>>>>>>>>> Kafka, I
>>>>>>>>>>>>> believe we'll have to add some support for rolling back
>>>>>> checkpoints,
>>>>>>>>>>> e.g.,
>>>>>>>>>>>>> when a transaction is aborted. We need to be aware of that and
>>>>>>> ideally
>>>>>>>>>>>>> anticipate a bit those needs in the KIP.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 1 Feb 2017, at 10:18, Damian Guy <damian....@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I would like to start the discussion on KIP-116:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 116+-+Add+State+Store+Checkpoint+Interval+Configuration
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> --
>> -- Guozhang
>> 
> 
> 
> 
> -- 
> -- Guozhang

Reply via email to