Hi Krzysztof,

There are several scenarios where you want a set of records to be sent 
atomically (to a statestore, downstream topics etc). In case of failure then, 
either all of them commit successfully, or none does. We are working to add 
exactly-once processing to Kafka Streams and I suspect you'll find that useful. 
It will build on the KIP-98 that is currently being discussed, on exactly once 
delivery and transactional messaging. 

Thanks
Eno

> On 2 Feb 2017, at 20:01, Krzysztof Lesniewski, Nexiot AG 
> <krzysztof.lesniew...@nexiot.ch> wrote:
> 
> Thank you Matthias for your answer.
> 
> Of course, wherever it is possible I will use idempotent updates, but 
> unfortunately it does not apply to all my cases.
> 
> I though before about the alternative to idempotent updates you have 
> proposed, but I think it carries a risk of breaking at-least-once delivery 
> semantics in rare cases. Given that state store changelog is flushed first, 
> if an application crashes after flushing the state store, but before flushing 
> all the produced records, a reprocessed record may be discarded (considered a 
> duplicate), even though there is a chance its processing result never reached 
> the destination topic. That is:
> 
> 1. A record with offset (or other identifier) A is processed. State
>   store updates the state and Id to A
> 2. On commit:
>    1. state store changes are flushed
>    2. server crashes: result of processing record A was buffered, but
>       never sent
> 3. State store is restored
> 4. Record with offset A is reprocessed, but before its Id is already in
>   the state store, it is considered a duplicate and discarded
> 
> An extension to your solution would be to additionally store a map of states 
> for past messages <K,<V, Id, Map<Id, V>>, and then if offset in the state is 
> greater than the current record's offset, state valid for the current record 
> can be retrieved from the map. Of course it adds extra complexity, as the map 
> has to be maintained to not grow indefinitely. An easy solution to removing 
> old records would be to access committed offset and delete all entries before 
> it, but I did not find an easy way to access the committed offset.
> 
> Is my thinking correct here? How could I maintain such state store and are 
> there other gotchas I should pay attention to?
> 
> Kind Regards
> *Krzysztof Lesniewski*
> 
> On 02.02.2017 18:59, Matthias J. Sax wrote:
>> Hi,
>> 
>> About message acks: writes will be acked, however async (not sync as you
>> describe it). Only before an actual commit, KafkaProducer#flush is
>> called and all not-yet received acks are collected (ie, blocking/sync)
>> before the commit is done.
>> 
>> About state guarantees: there are none -- state might be between X and Z
>> (that's what "at-least-once" can provide -- not more). Thus, it's
>> recommended to apply idempotent updates to the stores if possible.
>> 
>> As an alternative, you might also be able to add the latest "update
>> record" to the state itself to depulicate in failure case. For example,
>> instead of putting <K,V> in the state, you put <K<V,Id>> with id being a
>> record id that did the last modification on V. Thus, each time before
>> you update the state, you can check if you did already "add" the current
>> record to the state (for example, you could use the record offset as id)
>> -- if the offset in the state is not smaller than the current record's
>> offset, the current record is a duplicate.
>> 
>> 
>> -Matthias
>> 
>> 
>> 
>> On 2/2/17 6:38 AM, Krzysztof Lesniewski, Nexiot AG wrote:
>>> Hello,
>>> 
>>> In multiple sources I read that Kafka Streams has at-least-once delivery
>>> semantics, meaning that in case of failure, a message can be processed
>>> more than once, but it will not be lost. It is achieved by committing
>>> offset only after the message processing is completely finished and all
>>> intermediate state/data is flushed to a reliable storage. There are
>>> however some details I want to verify, especially related to Kafka
>>> Stream's State Store.
>>> 
>>> Let's take as an example a Kafka Streams application, which consumes
>>> messages from topic A, processes the messages using state store S and
>>> outputs the processing results to topic B. Ensuring at-least-once
>>> delivery semantics would mean following order:
>>> 
>>> 1. Message is processed
>>> 2. Updated state of state store S (update caused by applying processed
>>>    message) is sent to backing changelog topic *and acknowledged*
>>> 3. Processing result is sent to topic B *and acknowledged*
>>> 4. Offset of topic A is commited (may be deferred)
>>> 
>>> Acknowledging is necessary, as otherwise offset could be committed, even
>>> though messages or state were not successfully submitted, e.g. because
>>> of batching or because there are less available replicas than
>>> /min.insync.replicas/ and messages are being buffered. Am I correct here?
>>> 
>>> Then, regarding state store, what state will I end up with after a
>>> failure? Let's assume following scenario:
>>> 
>>> 1. I consume message K, and process it in context of state X.
>>>    Processing the message resulted in changing the state from X to Y.
>>> 2. I consume the next message, message L, and process it in context of
>>>    state Y. Processing the message resulted in changing the state from
>>>    Y to Z.
>>> 3. Server crashed. As no offset was committed, I will start processing
>>>    again from message K.
>>> 
>>> What state will I end up with when processing again the message K. Are
>>> there any guaranties that the state store will give me state X, or will
>>> it be any state between X and Z? This is very important, as reprocessing
>>> the message may return different results when evaluating in a context of
>>> a different state.
>>> 
> 

Reply via email to