Hi Krzysztof, There are several scenarios where you want a set of records to be sent atomically (to a statestore, downstream topics etc). In case of failure then, either all of them commit successfully, or none does. We are working to add exactly-once processing to Kafka Streams and I suspect you'll find that useful. It will build on the KIP-98 that is currently being discussed, on exactly once delivery and transactional messaging.
Thanks Eno > On 2 Feb 2017, at 20:01, Krzysztof Lesniewski, Nexiot AG > <krzysztof.lesniew...@nexiot.ch> wrote: > > Thank you Matthias for your answer. > > Of course, wherever it is possible I will use idempotent updates, but > unfortunately it does not apply to all my cases. > > I though before about the alternative to idempotent updates you have > proposed, but I think it carries a risk of breaking at-least-once delivery > semantics in rare cases. Given that state store changelog is flushed first, > if an application crashes after flushing the state store, but before flushing > all the produced records, a reprocessed record may be discarded (considered a > duplicate), even though there is a chance its processing result never reached > the destination topic. That is: > > 1. A record with offset (or other identifier) A is processed. State > store updates the state and Id to A > 2. On commit: > 1. state store changes are flushed > 2. server crashes: result of processing record A was buffered, but > never sent > 3. State store is restored > 4. Record with offset A is reprocessed, but before its Id is already in > the state store, it is considered a duplicate and discarded > > An extension to your solution would be to additionally store a map of states > for past messages <K,<V, Id, Map<Id, V>>, and then if offset in the state is > greater than the current record's offset, state valid for the current record > can be retrieved from the map. Of course it adds extra complexity, as the map > has to be maintained to not grow indefinitely. An easy solution to removing > old records would be to access committed offset and delete all entries before > it, but I did not find an easy way to access the committed offset. > > Is my thinking correct here? How could I maintain such state store and are > there other gotchas I should pay attention to? > > Kind Regards > *Krzysztof Lesniewski* > > On 02.02.2017 18:59, Matthias J. Sax wrote: >> Hi, >> >> About message acks: writes will be acked, however async (not sync as you >> describe it). Only before an actual commit, KafkaProducer#flush is >> called and all not-yet received acks are collected (ie, blocking/sync) >> before the commit is done. >> >> About state guarantees: there are none -- state might be between X and Z >> (that's what "at-least-once" can provide -- not more). Thus, it's >> recommended to apply idempotent updates to the stores if possible. >> >> As an alternative, you might also be able to add the latest "update >> record" to the state itself to depulicate in failure case. For example, >> instead of putting <K,V> in the state, you put <K<V,Id>> with id being a >> record id that did the last modification on V. Thus, each time before >> you update the state, you can check if you did already "add" the current >> record to the state (for example, you could use the record offset as id) >> -- if the offset in the state is not smaller than the current record's >> offset, the current record is a duplicate. >> >> >> -Matthias >> >> >> >> On 2/2/17 6:38 AM, Krzysztof Lesniewski, Nexiot AG wrote: >>> Hello, >>> >>> In multiple sources I read that Kafka Streams has at-least-once delivery >>> semantics, meaning that in case of failure, a message can be processed >>> more than once, but it will not be lost. It is achieved by committing >>> offset only after the message processing is completely finished and all >>> intermediate state/data is flushed to a reliable storage. There are >>> however some details I want to verify, especially related to Kafka >>> Stream's State Store. >>> >>> Let's take as an example a Kafka Streams application, which consumes >>> messages from topic A, processes the messages using state store S and >>> outputs the processing results to topic B. Ensuring at-least-once >>> delivery semantics would mean following order: >>> >>> 1. Message is processed >>> 2. Updated state of state store S (update caused by applying processed >>> message) is sent to backing changelog topic *and acknowledged* >>> 3. Processing result is sent to topic B *and acknowledged* >>> 4. Offset of topic A is commited (may be deferred) >>> >>> Acknowledging is necessary, as otherwise offset could be committed, even >>> though messages or state were not successfully submitted, e.g. because >>> of batching or because there are less available replicas than >>> /min.insync.replicas/ and messages are being buffered. Am I correct here? >>> >>> Then, regarding state store, what state will I end up with after a >>> failure? Let's assume following scenario: >>> >>> 1. I consume message K, and process it in context of state X. >>> Processing the message resulted in changing the state from X to Y. >>> 2. I consume the next message, message L, and process it in context of >>> state Y. Processing the message resulted in changing the state from >>> Y to Z. >>> 3. Server crashed. As no offset was committed, I will start processing >>> again from message K. >>> >>> What state will I end up with when processing again the message K. Are >>> there any guaranties that the state store will give me state X, or will >>> it be any state between X and Z? This is very important, as reprocessing >>> the message may return different results when evaluating in a context of >>> a different state. >>> >