Answers inline. -Matthias
On 2/3/17 7:37 AM, Krzysztof Lesniewski, Nexiot AG wrote: > Thank you Eno for the information on KIP-98. Making downstream topic and > state store's changelog writes atomic would do simplify the problem. I > did not dive into the design, so I am not able to tell if it would bring > other implications, but as KIP-98 is so far under discussion, I have to > settle on some other solution. It is valuable you brought it though. > > Matthias, from what you wrote I understand, that even if state gets > flushed to a local state store (e.g. RocksDB), but it does not reach > underlying changelog before the crash, after restarting the application, > the state gets restored from changelong and hence successful flush to > local state store does not matter in that case, as the local state store > will be replaced with the restored one. Is that correct? Yes. Exactly like this. > > My previous post was based on an invalid assumption, that updates to > state (changelog topic) are flushed before updates to downstream topic > (that is updates to downstream topic are sent only after all updates to > changelog are acknowledged). I looked at the code and from what I > understood, updates to all the topics (changelog and downstream) are > sent at more or less the same time. I could not quickly determine > though, if it is possible (in case of failure) that a change to state > store's changelog was successfully written while a related change to > downstream topic was not written. As the writes are not atomic I would > assume there is such possibility, which in rare cases could cause losing > a message. Yes and no. Yes, it could happen, that a state change is flushed to changelog topic but not to downstream result topic. No, this would not result in a loss of data. Because the flush to downstream result topic failed, no offset commit for the input record will be done. And thus, the input record doing the state update would get reprocessed, recomputing the "lost" downstream topic result (if operation is idempotent). For this case, if you detect a duplicate record (and to avoid data loss), you will not update the list in <K,list<V>> with the duplicate record, but you will need to recompute the aggregate result and emit it downstream. In case the result got flushed to changelog and result topic, you will still apply the same strategy of not update state, but recompute aggreate (because you cannot know which of both cases it true) and you will write the same result a second time to the result topic and get an duplicate (but that aligns with "at-least-once") > > Nevertheless, in my use case such loss in rare circumstances is > acceptable and therefore extra complexity required to avoid it is > unnecessary. I will then go for the solution you have proposed with > storing <K,<Id, V>>. I would appreciate though if you could verify > whether what I wrote above is correct. > > Kind Regards > Krzysztof Lesniewski > > On 03.02.2017 01:06, Matthias J. Sax wrote: >> You assumptions is not completely correct. >> >> After a crash and State Store restore, the store will contain exactly >> the same data as written to the underlying changelog. Thus, if you >> update was buffered but never send, the store will not contain the >> update after restore and thus the record will not be considered a >> duplicate. >> >> >> -Matthias >> >> On 2/2/17 12:55 PM, Eno Thereska wrote: >>> Hi Krzysztof, >>> >>> There are several scenarios where you want a set of records to be >>> sent atomically (to a statestore, downstream topics etc). In case of >>> failure then, either all of them commit successfully, or none does. >>> We are working to add exactly-once processing to Kafka Streams and I >>> suspect you'll find that useful. It will build on the KIP-98 that is >>> currently being discussed, on exactly once delivery and transactional >>> messaging. >>> >>> Thanks >>> Eno >>> >>>> On 2 Feb 2017, at 20:01, Krzysztof Lesniewski, Nexiot AG >>>> <krzysztof.lesniew...@nexiot.ch> wrote: >>>> >>>> Thank you Matthias for your answer. >>>> >>>> Of course, wherever it is possible I will use idempotent updates, >>>> but unfortunately it does not apply to all my cases. >>>> >>>> I though before about the alternative to idempotent updates you have >>>> proposed, but I think it carries a risk of breaking at-least-once >>>> delivery semantics in rare cases. Given that state store changelog >>>> is flushed first, if an application crashes after flushing the state >>>> store, but before flushing all the produced records, a reprocessed >>>> record may be discarded (considered a duplicate), even though there >>>> is a chance its processing result never reached the destination >>>> topic. That is: >>>> >>>> 1. A record with offset (or other identifier) A is processed. State >>>> store updates the state and Id to A >>>> 2. On commit: >>>> 1. state store changes are flushed >>>> 2. server crashes: result of processing record A was buffered, but >>>> never sent >>>> 3. State store is restored >>>> 4. Record with offset A is reprocessed, but before its Id is already in >>>> the state store, it is considered a duplicate and discarded >>>> >>>> An extension to your solution would be to additionally store a map >>>> of states for past messages <K,<V, Id, Map<Id, V>>, and then if >>>> offset in the state is greater than the current record's offset, >>>> state valid for the current record can be retrieved from the map. Of >>>> course it adds extra complexity, as the map has to be maintained to >>>> not grow indefinitely. An easy solution to removing old records >>>> would be to access committed offset and delete all entries before >>>> it, but I did not find an easy way to access the committed offset. >>>> >>>> Is my thinking correct here? How could I maintain such state store >>>> and are there other gotchas I should pay attention to? >>>> >>>> Kind Regards >>>> *Krzysztof Lesniewski* >>>> >>>> On 02.02.2017 18:59, Matthias J. Sax wrote: >>>>> Hi, >>>>> >>>>> About message acks: writes will be acked, however async (not sync >>>>> as you >>>>> describe it). Only before an actual commit, KafkaProducer#flush is >>>>> called and all not-yet received acks are collected (ie, blocking/sync) >>>>> before the commit is done. >>>>> >>>>> About state guarantees: there are none -- state might be between X >>>>> and Z >>>>> (that's what "at-least-once" can provide -- not more). Thus, it's >>>>> recommended to apply idempotent updates to the stores if possible. >>>>> >>>>> As an alternative, you might also be able to add the latest "update >>>>> record" to the state itself to depulicate in failure case. For >>>>> example, >>>>> instead of putting <K,V> in the state, you put <K<V,Id>> with id >>>>> being a >>>>> record id that did the last modification on V. Thus, each time before >>>>> you update the state, you can check if you did already "add" the >>>>> current >>>>> record to the state (for example, you could use the record offset >>>>> as id) >>>>> -- if the offset in the state is not smaller than the current record's >>>>> offset, the current record is a duplicate. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> On 2/2/17 6:38 AM, Krzysztof Lesniewski, Nexiot AG wrote: >>>>>> Hello, >>>>>> >>>>>> In multiple sources I read that Kafka Streams has at-least-once >>>>>> delivery >>>>>> semantics, meaning that in case of failure, a message can be >>>>>> processed >>>>>> more than once, but it will not be lost. It is achieved by committing >>>>>> offset only after the message processing is completely finished >>>>>> and all >>>>>> intermediate state/data is flushed to a reliable storage. There are >>>>>> however some details I want to verify, especially related to Kafka >>>>>> Stream's State Store. >>>>>> >>>>>> Let's take as an example a Kafka Streams application, which consumes >>>>>> messages from topic A, processes the messages using state store S and >>>>>> outputs the processing results to topic B. Ensuring at-least-once >>>>>> delivery semantics would mean following order: >>>>>> >>>>>> 1. Message is processed >>>>>> 2. Updated state of state store S (update caused by applying >>>>>> processed >>>>>> message) is sent to backing changelog topic *and acknowledged* >>>>>> 3. Processing result is sent to topic B *and acknowledged* >>>>>> 4. Offset of topic A is commited (may be deferred) >>>>>> >>>>>> Acknowledging is necessary, as otherwise offset could be >>>>>> committed, even >>>>>> though messages or state were not successfully submitted, e.g. >>>>>> because >>>>>> of batching or because there are less available replicas than >>>>>> /min.insync.replicas/ and messages are being buffered. Am I >>>>>> correct here? >>>>>> >>>>>> Then, regarding state store, what state will I end up with after a >>>>>> failure? Let's assume following scenario: >>>>>> >>>>>> 1. I consume message K, and process it in context of state X. >>>>>> Processing the message resulted in changing the state from X >>>>>> to Y. >>>>>> 2. I consume the next message, message L, and process it in >>>>>> context of >>>>>> state Y. Processing the message resulted in changing the state >>>>>> from >>>>>> Y to Z. >>>>>> 3. Server crashed. As no offset was committed, I will start >>>>>> processing >>>>>> again from message K. >>>>>> >>>>>> What state will I end up with when processing again the message K. >>>>>> Are >>>>>> there any guaranties that the state store will give me state X, or >>>>>> will >>>>>> it be any state between X and Z? This is very important, as >>>>>> reprocessing >>>>>> the message may return different results when evaluating in a >>>>>> context of >>>>>> a different state. >>>>>> > >
signature.asc
Description: OpenPGP digital signature