Thank you Matthias for your answer.

Of course, wherever it is possible I will use idempotent updates, but unfortunately it does not apply to all my cases.

I though before about the alternative to idempotent updates you have proposed, but I think it carries a risk of breaking at-least-once delivery semantics in rare cases. Given that state store changelog is flushed first, if an application crashes after flushing the state store, but before flushing all the produced records, a reprocessed record may be discarded (considered a duplicate), even though there is a chance its processing result never reached the destination topic. That is:

1. A record with offset (or other identifier) A is processed. State
   store updates the state and Id to A
2. On commit:
    1. state store changes are flushed
    2. server crashes: result of processing record A was buffered, but
       never sent
3. State store is restored
4. Record with offset A is reprocessed, but before its Id is already in
   the state store, it is considered a duplicate and discarded

An extension to your solution would be to additionally store a map of states for past messages <K,<V, Id, Map<Id, V>>, and then if offset in the state is greater than the current record's offset, state valid for the current record can be retrieved from the map. Of course it adds extra complexity, as the map has to be maintained to not grow indefinitely. An easy solution to removing old records would be to access committed offset and delete all entries before it, but I did not find an easy way to access the committed offset.

Is my thinking correct here? How could I maintain such state store and are there other gotchas I should pay attention to?

Kind Regards
*Krzysztof Lesniewski*

On 02.02.2017 18:59, Matthias J. Sax wrote:
Hi,

About message acks: writes will be acked, however async (not sync as you
describe it). Only before an actual commit, KafkaProducer#flush is
called and all not-yet received acks are collected (ie, blocking/sync)
before the commit is done.

About state guarantees: there are none -- state might be between X and Z
(that's what "at-least-once" can provide -- not more). Thus, it's
recommended to apply idempotent updates to the stores if possible.

As an alternative, you might also be able to add the latest "update
record" to the state itself to depulicate in failure case. For example,
instead of putting <K,V> in the state, you put <K<V,Id>> with id being a
record id that did the last modification on V. Thus, each time before
you update the state, you can check if you did already "add" the current
record to the state (for example, you could use the record offset as id)
-- if the offset in the state is not smaller than the current record's
offset, the current record is a duplicate.


-Matthias



On 2/2/17 6:38 AM, Krzysztof Lesniewski, Nexiot AG wrote:
Hello,

In multiple sources I read that Kafka Streams has at-least-once delivery
semantics, meaning that in case of failure, a message can be processed
more than once, but it will not be lost. It is achieved by committing
offset only after the message processing is completely finished and all
intermediate state/data is flushed to a reliable storage. There are
however some details I want to verify, especially related to Kafka
Stream's State Store.

Let's take as an example a Kafka Streams application, which consumes
messages from topic A, processes the messages using state store S and
outputs the processing results to topic B. Ensuring at-least-once
delivery semantics would mean following order:

1. Message is processed
2. Updated state of state store S (update caused by applying processed
    message) is sent to backing changelog topic *and acknowledged*
3. Processing result is sent to topic B *and acknowledged*
4. Offset of topic A is commited (may be deferred)

Acknowledging is necessary, as otherwise offset could be committed, even
though messages or state were not successfully submitted, e.g. because
of batching or because there are less available replicas than
/min.insync.replicas/ and messages are being buffered. Am I correct here?

Then, regarding state store, what state will I end up with after a
failure? Let's assume following scenario:

1. I consume message K, and process it in context of state X.
    Processing the message resulted in changing the state from X to Y.
2. I consume the next message, message L, and process it in context of
    state Y. Processing the message resulted in changing the state from
    Y to Z.
3. Server crashed. As no offset was committed, I will start processing
    again from message K.

What state will I end up with when processing again the message K. Are
there any guaranties that the state store will give me state X, or will
it be any state between X and Z? This is very important, as reprocessing
the message may return different results when evaluating in a context of
a different state.


Reply via email to