Completely agree. Very good and deep analysis!

-Matthias

On 2/6/17 3:15 AM, Krzysztof Lesniewski, Nexiot AG wrote:
> Thank you Matthias for your answer.
> 
> In general I agree with what you are saying. I was however thinking
> about this one case, in which I think the at-least-once delivery
> semantics may still be broken. However, I came to conclusion it does not
> really matter in that application. Here is the case:
> 
> Let's assume we have an application which consumes numbers and for each
> number pushes downstream an accumulated sum of all previous numbers as
> below (offsets are marked with letters):
> 
> (a) 1, (b) 3, (c) 2, (d) 1, (e) 4 => Streams application => (a) 1, (b)
> 4, (c) 6, (d) 7, (e) 11
> 
> The state stores offset of the last message affecting the state and the
> accumulated sum (including that message), which is used with a next
> message to compute the accumulated sum. That is after processing message
> (a) state becomes <a, 1>, after processing message (b) state becomes <b,
> 4> and so on.
> 
> Now let's assume following scenario:
> 
> 1. Application starts consuming messages. Consumes first 3 messages
>    (a), (b), and (c)
> 2. Application crashes. State store's changes were successfully written
>    to its backing changelog topic, whereas writing to downstream topic
>    failed. Hence:
>      * Changelog topic {K: <c, 2>}
>      * Downstream topic: []
> 3. Application resumes. State store gets restored and has value <c, 2>
> 4. Application starts consuming messages. Since no offset was
>    committed, the application starts processing from message (a)
> 5. Message (a) is evaluated against the kept state. Because offset in
>    the state (c) is larger than (a), there is not enough information to
>    process the message (a), the message is discarded. The same happens
>    to the message (b).
> 6. Message (c) is processed. As it has the same offset as the one in
>    the state, we can determine the message, which should be written to
>    the downstream topic, so we push message with value 6.
> 7. Messages (d) and (e) are processed normally.
> 8. The downstream topic now contains messages [(c) 6, (d) 7, (e) 11].
> 
> In this scenario we lost output of processing messages (a) and (b).
> However, if we are interested in the currently accumulated sum, we can
> still read it and hence losing messages did not change the final result.
> 
> If we really needed to, we could preserve state as a map <K, map<offset,
> state>>. This way the processing would be idempotent and we could still
> produce outcome for messages (a) and (b). It adds however extra
> complexity - we need to maintain the map over time by deleting entries
> older than committed offset.
> 
> What do you think Matthias?
> 
> Kind Regards
> Krzysztof Lesniewski
> 
> On 03.02.2017 20:02, Matthias J. Sax wrote:
>> Answers inline.
>>
>> -Matthias
>>
>>
>> On 2/3/17 7:37 AM, Krzysztof Lesniewski, Nexiot AG wrote:
>>> Thank you Eno for the information on KIP-98. Making downstream topic and
>>> state store's changelog writes atomic would do simplify the problem. I
>>> did not dive into the design, so I am not able to tell if it would bring
>>> other implications, but as KIP-98 is so far under discussion, I have to
>>> settle on some other solution. It is valuable you brought it though.
>>>
>>> Matthias, from what you wrote I understand, that even if state gets
>>> flushed to a local state store (e.g. RocksDB), but it does not reach
>>> underlying changelog before the crash, after restarting the application,
>>> the state gets restored from changelong and hence successful flush to
>>> local state store does not matter in that case, as the local state store
>>> will be replaced with the restored one. Is that correct?
>> Yes. Exactly like this.
>>
>>
>>> My previous post was based on an invalid assumption, that updates to
>>> state (changelog topic) are flushed before updates to downstream topic
>>> (that is updates to downstream topic are sent only after all updates to
>>> changelog are acknowledged). I looked at the code and from what I
>>> understood, updates to all the topics (changelog and downstream) are
>>> sent at more or less the same time. I could not quickly determine
>>> though, if it is possible (in case of failure) that a change to state
>>> store's changelog was successfully written while a related change to
>>> downstream topic was not written. As the writes are not atomic I would
>>> assume there is such possibility, which in rare cases could cause losing
>>> a message.
>> Yes and no.
>>
>> Yes, it could happen, that a state change is flushed to changelog topic
>> but not to downstream result topic.
>>
>> No, this would not result in a loss of data. Because the flush to
>> downstream result topic failed, no offset commit for the input record
>> will be done. And thus, the input record doing the state update would
>> get reprocessed, recomputing the "lost" downstream topic result (if
>> operation is idempotent).
>>
>> For this case, if you detect a duplicate record (and to avoid data
>> loss), you will not update the list in <K,list<V>> with the duplicate
>> record, but you will need to recompute the aggregate result and emit it
>> downstream.
>>
>> In case the result got flushed to changelog and result topic, you will
>> still apply the same strategy of not update state, but recompute
>> aggreate (because you cannot know which of both cases it true) and you
>> will write the same result a second time to the result topic and get an
>> duplicate (but that aligns with "at-least-once")
>>
>>> Nevertheless, in my use case such loss in rare circumstances is
>>> acceptable and therefore extra complexity required to avoid it is
>>> unnecessary. I will then go for the solution you have proposed with
>>> storing <K,<Id, V>>. I would appreciate though if you could verify
>>> whether what I wrote above is correct.
>>>
>>> Kind Regards
>>> Krzysztof Lesniewski
>>>
>>> On 03.02.2017 01:06, Matthias J. Sax wrote:
>>>> You assumptions is not completely correct.
>>>>
>>>> After a crash and State Store restore, the store will contain exactly
>>>> the same data as written to the underlying changelog. Thus, if you
>>>> update was buffered but never send, the store will not contain the
>>>> update after restore and thus the record will not be considered a
>>>> duplicate.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 2/2/17 12:55 PM, Eno Thereska wrote:
>>>>> Hi Krzysztof,
>>>>>
>>>>> There are several scenarios where you want a set of records to be
>>>>> sent atomically (to a statestore, downstream topics etc). In case of
>>>>> failure then, either all of them commit successfully, or none does.
>>>>> We are working to add exactly-once processing to Kafka Streams and I
>>>>> suspect you'll find that useful. It will build on the KIP-98 that is
>>>>> currently being discussed, on exactly once delivery and transactional
>>>>> messaging.
>>>>>
>>>>> Thanks
>>>>> Eno
>>>>>
>>>>>> On 2 Feb 2017, at 20:01, Krzysztof Lesniewski, Nexiot AG
>>>>>> <krzysztof.lesniew...@nexiot.ch> wrote:
>>>>>>
>>>>>> Thank you Matthias for your answer.
>>>>>>
>>>>>> Of course, wherever it is possible I will use idempotent updates,
>>>>>> but unfortunately it does not apply to all my cases.
>>>>>>
>>>>>> I though before about the alternative to idempotent updates you have
>>>>>> proposed, but I think it carries a risk of breaking at-least-once
>>>>>> delivery semantics in rare cases. Given that state store changelog
>>>>>> is flushed first, if an application crashes after flushing the state
>>>>>> store, but before flushing all the produced records, a reprocessed
>>>>>> record may be discarded (considered a duplicate), even though there
>>>>>> is a chance its processing result never reached the destination
>>>>>> topic. That is:
>>>>>>
>>>>>> 1. A record with offset (or other identifier) A is processed. State
>>>>>>     store updates the state and Id to A
>>>>>> 2. On commit:
>>>>>>      1. state store changes are flushed
>>>>>>      2. server crashes: result of processing record A was
>>>>>> buffered, but
>>>>>>         never sent
>>>>>> 3. State store is restored
>>>>>> 4. Record with offset A is reprocessed, but before its Id is
>>>>>> already in
>>>>>>     the state store, it is considered a duplicate and discarded
>>>>>>
>>>>>> An extension to your solution would be to additionally store a map
>>>>>> of states for past messages <K,<V, Id, Map<Id, V>>, and then if
>>>>>> offset in the state is greater than the current record's offset,
>>>>>> state valid for the current record can be retrieved from the map. Of
>>>>>> course it adds extra complexity, as the map has to be maintained to
>>>>>> not grow indefinitely. An easy solution to removing old records
>>>>>> would be to access committed offset and delete all entries before
>>>>>> it, but I did not find an easy way to access the committed offset.
>>>>>>
>>>>>> Is my thinking correct here? How could I maintain such state store
>>>>>> and are there other gotchas I should pay attention to?
>>>>>>
>>>>>> Kind Regards
>>>>>> *Krzysztof Lesniewski*
>>>>>>
>>>>>> On 02.02.2017 18:59, Matthias J. Sax wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> About message acks: writes will be acked, however async (not sync
>>>>>>> as you
>>>>>>> describe it). Only before an actual commit, KafkaProducer#flush is
>>>>>>> called and all not-yet received acks are collected (ie,
>>>>>>> blocking/sync)
>>>>>>> before the commit is done.
>>>>>>>
>>>>>>> About state guarantees: there are none -- state might be between X
>>>>>>> and Z
>>>>>>> (that's what "at-least-once" can provide -- not more). Thus, it's
>>>>>>> recommended to apply idempotent updates to the stores if possible.
>>>>>>>
>>>>>>> As an alternative, you might also be able to add the latest "update
>>>>>>> record" to the state itself to depulicate in failure case. For
>>>>>>> example,
>>>>>>> instead of putting <K,V> in the state, you put <K<V,Id>> with id
>>>>>>> being a
>>>>>>> record id that did the last modification on V. Thus, each time
>>>>>>> before
>>>>>>> you update the state, you can check if you did already "add" the
>>>>>>> current
>>>>>>> record to the state (for example, you could use the record offset
>>>>>>> as id)
>>>>>>> -- if the offset in the state is not smaller than the current
>>>>>>> record's
>>>>>>> offset, the current record is a duplicate.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 2/2/17 6:38 AM, Krzysztof Lesniewski, Nexiot AG wrote:
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> In multiple sources I read that Kafka Streams has at-least-once
>>>>>>>> delivery
>>>>>>>> semantics, meaning that in case of failure, a message can be
>>>>>>>> processed
>>>>>>>> more than once, but it will not be lost. It is achieved by
>>>>>>>> committing
>>>>>>>> offset only after the message processing is completely finished
>>>>>>>> and all
>>>>>>>> intermediate state/data is flushed to a reliable storage. There are
>>>>>>>> however some details I want to verify, especially related to Kafka
>>>>>>>> Stream's State Store.
>>>>>>>>
>>>>>>>> Let's take as an example a Kafka Streams application, which
>>>>>>>> consumes
>>>>>>>> messages from topic A, processes the messages using state store
>>>>>>>> S and
>>>>>>>> outputs the processing results to topic B. Ensuring at-least-once
>>>>>>>> delivery semantics would mean following order:
>>>>>>>>
>>>>>>>> 1. Message is processed
>>>>>>>> 2. Updated state of state store S (update caused by applying
>>>>>>>> processed
>>>>>>>>      message) is sent to backing changelog topic *and acknowledged*
>>>>>>>> 3. Processing result is sent to topic B *and acknowledged*
>>>>>>>> 4. Offset of topic A is commited (may be deferred)
>>>>>>>>
>>>>>>>> Acknowledging is necessary, as otherwise offset could be
>>>>>>>> committed, even
>>>>>>>> though messages or state were not successfully submitted, e.g.
>>>>>>>> because
>>>>>>>> of batching or because there are less available replicas than
>>>>>>>> /min.insync.replicas/ and messages are being buffered. Am I
>>>>>>>> correct here?
>>>>>>>>
>>>>>>>> Then, regarding state store, what state will I end up with after a
>>>>>>>> failure? Let's assume following scenario:
>>>>>>>>
>>>>>>>> 1. I consume message K, and process it in context of state X.
>>>>>>>>      Processing the message resulted in changing the state from X
>>>>>>>> to Y.
>>>>>>>> 2. I consume the next message, message L, and process it in
>>>>>>>> context of
>>>>>>>>      state Y. Processing the message resulted in changing the state
>>>>>>>> from
>>>>>>>>      Y to Z.
>>>>>>>> 3. Server crashed. As no offset was committed, I will start
>>>>>>>> processing
>>>>>>>>      again from message K.
>>>>>>>>
>>>>>>>> What state will I end up with when processing again the message K.
>>>>>>>> Are
>>>>>>>> there any guaranties that the state store will give me state X, or
>>>>>>>> will
>>>>>>>> it be any state between X and Z? This is very important, as
>>>>>>>> reprocessing
>>>>>>>> the message may return different results when evaluating in a
>>>>>>>> context of
>>>>>>>> a different state.
>>>>>>>>
>>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to