Completely agree. Very good and deep analysis! -Matthias
On 2/6/17 3:15 AM, Krzysztof Lesniewski, Nexiot AG wrote: > Thank you Matthias for your answer. > > In general I agree with what you are saying. I was however thinking > about this one case, in which I think the at-least-once delivery > semantics may still be broken. However, I came to conclusion it does not > really matter in that application. Here is the case: > > Let's assume we have an application which consumes numbers and for each > number pushes downstream an accumulated sum of all previous numbers as > below (offsets are marked with letters): > > (a) 1, (b) 3, (c) 2, (d) 1, (e) 4 => Streams application => (a) 1, (b) > 4, (c) 6, (d) 7, (e) 11 > > The state stores offset of the last message affecting the state and the > accumulated sum (including that message), which is used with a next > message to compute the accumulated sum. That is after processing message > (a) state becomes <a, 1>, after processing message (b) state becomes <b, > 4> and so on. > > Now let's assume following scenario: > > 1. Application starts consuming messages. Consumes first 3 messages > (a), (b), and (c) > 2. Application crashes. State store's changes were successfully written > to its backing changelog topic, whereas writing to downstream topic > failed. Hence: > * Changelog topic {K: <c, 2>} > * Downstream topic: [] > 3. Application resumes. State store gets restored and has value <c, 2> > 4. Application starts consuming messages. Since no offset was > committed, the application starts processing from message (a) > 5. Message (a) is evaluated against the kept state. Because offset in > the state (c) is larger than (a), there is not enough information to > process the message (a), the message is discarded. The same happens > to the message (b). > 6. Message (c) is processed. As it has the same offset as the one in > the state, we can determine the message, which should be written to > the downstream topic, so we push message with value 6. > 7. Messages (d) and (e) are processed normally. > 8. The downstream topic now contains messages [(c) 6, (d) 7, (e) 11]. > > In this scenario we lost output of processing messages (a) and (b). > However, if we are interested in the currently accumulated sum, we can > still read it and hence losing messages did not change the final result. > > If we really needed to, we could preserve state as a map <K, map<offset, > state>>. This way the processing would be idempotent and we could still > produce outcome for messages (a) and (b). It adds however extra > complexity - we need to maintain the map over time by deleting entries > older than committed offset. > > What do you think Matthias? > > Kind Regards > Krzysztof Lesniewski > > On 03.02.2017 20:02, Matthias J. Sax wrote: >> Answers inline. >> >> -Matthias >> >> >> On 2/3/17 7:37 AM, Krzysztof Lesniewski, Nexiot AG wrote: >>> Thank you Eno for the information on KIP-98. Making downstream topic and >>> state store's changelog writes atomic would do simplify the problem. I >>> did not dive into the design, so I am not able to tell if it would bring >>> other implications, but as KIP-98 is so far under discussion, I have to >>> settle on some other solution. It is valuable you brought it though. >>> >>> Matthias, from what you wrote I understand, that even if state gets >>> flushed to a local state store (e.g. RocksDB), but it does not reach >>> underlying changelog before the crash, after restarting the application, >>> the state gets restored from changelong and hence successful flush to >>> local state store does not matter in that case, as the local state store >>> will be replaced with the restored one. Is that correct? >> Yes. Exactly like this. >> >> >>> My previous post was based on an invalid assumption, that updates to >>> state (changelog topic) are flushed before updates to downstream topic >>> (that is updates to downstream topic are sent only after all updates to >>> changelog are acknowledged). I looked at the code and from what I >>> understood, updates to all the topics (changelog and downstream) are >>> sent at more or less the same time. I could not quickly determine >>> though, if it is possible (in case of failure) that a change to state >>> store's changelog was successfully written while a related change to >>> downstream topic was not written. As the writes are not atomic I would >>> assume there is such possibility, which in rare cases could cause losing >>> a message. >> Yes and no. >> >> Yes, it could happen, that a state change is flushed to changelog topic >> but not to downstream result topic. >> >> No, this would not result in a loss of data. Because the flush to >> downstream result topic failed, no offset commit for the input record >> will be done. And thus, the input record doing the state update would >> get reprocessed, recomputing the "lost" downstream topic result (if >> operation is idempotent). >> >> For this case, if you detect a duplicate record (and to avoid data >> loss), you will not update the list in <K,list<V>> with the duplicate >> record, but you will need to recompute the aggregate result and emit it >> downstream. >> >> In case the result got flushed to changelog and result topic, you will >> still apply the same strategy of not update state, but recompute >> aggreate (because you cannot know which of both cases it true) and you >> will write the same result a second time to the result topic and get an >> duplicate (but that aligns with "at-least-once") >> >>> Nevertheless, in my use case such loss in rare circumstances is >>> acceptable and therefore extra complexity required to avoid it is >>> unnecessary. I will then go for the solution you have proposed with >>> storing <K,<Id, V>>. I would appreciate though if you could verify >>> whether what I wrote above is correct. >>> >>> Kind Regards >>> Krzysztof Lesniewski >>> >>> On 03.02.2017 01:06, Matthias J. Sax wrote: >>>> You assumptions is not completely correct. >>>> >>>> After a crash and State Store restore, the store will contain exactly >>>> the same data as written to the underlying changelog. Thus, if you >>>> update was buffered but never send, the store will not contain the >>>> update after restore and thus the record will not be considered a >>>> duplicate. >>>> >>>> >>>> -Matthias >>>> >>>> On 2/2/17 12:55 PM, Eno Thereska wrote: >>>>> Hi Krzysztof, >>>>> >>>>> There are several scenarios where you want a set of records to be >>>>> sent atomically (to a statestore, downstream topics etc). In case of >>>>> failure then, either all of them commit successfully, or none does. >>>>> We are working to add exactly-once processing to Kafka Streams and I >>>>> suspect you'll find that useful. It will build on the KIP-98 that is >>>>> currently being discussed, on exactly once delivery and transactional >>>>> messaging. >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>>> On 2 Feb 2017, at 20:01, Krzysztof Lesniewski, Nexiot AG >>>>>> <krzysztof.lesniew...@nexiot.ch> wrote: >>>>>> >>>>>> Thank you Matthias for your answer. >>>>>> >>>>>> Of course, wherever it is possible I will use idempotent updates, >>>>>> but unfortunately it does not apply to all my cases. >>>>>> >>>>>> I though before about the alternative to idempotent updates you have >>>>>> proposed, but I think it carries a risk of breaking at-least-once >>>>>> delivery semantics in rare cases. Given that state store changelog >>>>>> is flushed first, if an application crashes after flushing the state >>>>>> store, but before flushing all the produced records, a reprocessed >>>>>> record may be discarded (considered a duplicate), even though there >>>>>> is a chance its processing result never reached the destination >>>>>> topic. That is: >>>>>> >>>>>> 1. A record with offset (or other identifier) A is processed. State >>>>>> store updates the state and Id to A >>>>>> 2. On commit: >>>>>> 1. state store changes are flushed >>>>>> 2. server crashes: result of processing record A was >>>>>> buffered, but >>>>>> never sent >>>>>> 3. State store is restored >>>>>> 4. Record with offset A is reprocessed, but before its Id is >>>>>> already in >>>>>> the state store, it is considered a duplicate and discarded >>>>>> >>>>>> An extension to your solution would be to additionally store a map >>>>>> of states for past messages <K,<V, Id, Map<Id, V>>, and then if >>>>>> offset in the state is greater than the current record's offset, >>>>>> state valid for the current record can be retrieved from the map. Of >>>>>> course it adds extra complexity, as the map has to be maintained to >>>>>> not grow indefinitely. An easy solution to removing old records >>>>>> would be to access committed offset and delete all entries before >>>>>> it, but I did not find an easy way to access the committed offset. >>>>>> >>>>>> Is my thinking correct here? How could I maintain such state store >>>>>> and are there other gotchas I should pay attention to? >>>>>> >>>>>> Kind Regards >>>>>> *Krzysztof Lesniewski* >>>>>> >>>>>> On 02.02.2017 18:59, Matthias J. Sax wrote: >>>>>>> Hi, >>>>>>> >>>>>>> About message acks: writes will be acked, however async (not sync >>>>>>> as you >>>>>>> describe it). Only before an actual commit, KafkaProducer#flush is >>>>>>> called and all not-yet received acks are collected (ie, >>>>>>> blocking/sync) >>>>>>> before the commit is done. >>>>>>> >>>>>>> About state guarantees: there are none -- state might be between X >>>>>>> and Z >>>>>>> (that's what "at-least-once" can provide -- not more). Thus, it's >>>>>>> recommended to apply idempotent updates to the stores if possible. >>>>>>> >>>>>>> As an alternative, you might also be able to add the latest "update >>>>>>> record" to the state itself to depulicate in failure case. For >>>>>>> example, >>>>>>> instead of putting <K,V> in the state, you put <K<V,Id>> with id >>>>>>> being a >>>>>>> record id that did the last modification on V. Thus, each time >>>>>>> before >>>>>>> you update the state, you can check if you did already "add" the >>>>>>> current >>>>>>> record to the state (for example, you could use the record offset >>>>>>> as id) >>>>>>> -- if the offset in the state is not smaller than the current >>>>>>> record's >>>>>>> offset, the current record is a duplicate. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 2/2/17 6:38 AM, Krzysztof Lesniewski, Nexiot AG wrote: >>>>>>>> Hello, >>>>>>>> >>>>>>>> In multiple sources I read that Kafka Streams has at-least-once >>>>>>>> delivery >>>>>>>> semantics, meaning that in case of failure, a message can be >>>>>>>> processed >>>>>>>> more than once, but it will not be lost. It is achieved by >>>>>>>> committing >>>>>>>> offset only after the message processing is completely finished >>>>>>>> and all >>>>>>>> intermediate state/data is flushed to a reliable storage. There are >>>>>>>> however some details I want to verify, especially related to Kafka >>>>>>>> Stream's State Store. >>>>>>>> >>>>>>>> Let's take as an example a Kafka Streams application, which >>>>>>>> consumes >>>>>>>> messages from topic A, processes the messages using state store >>>>>>>> S and >>>>>>>> outputs the processing results to topic B. Ensuring at-least-once >>>>>>>> delivery semantics would mean following order: >>>>>>>> >>>>>>>> 1. Message is processed >>>>>>>> 2. Updated state of state store S (update caused by applying >>>>>>>> processed >>>>>>>> message) is sent to backing changelog topic *and acknowledged* >>>>>>>> 3. Processing result is sent to topic B *and acknowledged* >>>>>>>> 4. Offset of topic A is commited (may be deferred) >>>>>>>> >>>>>>>> Acknowledging is necessary, as otherwise offset could be >>>>>>>> committed, even >>>>>>>> though messages or state were not successfully submitted, e.g. >>>>>>>> because >>>>>>>> of batching or because there are less available replicas than >>>>>>>> /min.insync.replicas/ and messages are being buffered. Am I >>>>>>>> correct here? >>>>>>>> >>>>>>>> Then, regarding state store, what state will I end up with after a >>>>>>>> failure? Let's assume following scenario: >>>>>>>> >>>>>>>> 1. I consume message K, and process it in context of state X. >>>>>>>> Processing the message resulted in changing the state from X >>>>>>>> to Y. >>>>>>>> 2. I consume the next message, message L, and process it in >>>>>>>> context of >>>>>>>> state Y. Processing the message resulted in changing the state >>>>>>>> from >>>>>>>> Y to Z. >>>>>>>> 3. Server crashed. As no offset was committed, I will start >>>>>>>> processing >>>>>>>> again from message K. >>>>>>>> >>>>>>>> What state will I end up with when processing again the message K. >>>>>>>> Are >>>>>>>> there any guaranties that the state store will give me state X, or >>>>>>>> will >>>>>>>> it be any state between X and Z? This is very important, as >>>>>>>> reprocessing >>>>>>>> the message may return different results when evaluating in a >>>>>>>> context of >>>>>>>> a different state. >>>>>>>> >>> > >
signature.asc
Description: OpenPGP digital signature