Thank you Matthias for your answer.

In general I agree with what you are saying. I was however thinking about this one case, in which I think the at-least-once delivery semantics may still be broken. However, I came to conclusion it does not really matter in that application. Here is the case:

Let's assume we have an application which consumes numbers and for each number pushes downstream an accumulated sum of all previous numbers as below (offsets are marked with letters):

(a) 1, (b) 3, (c) 2, (d) 1, (e) 4 => Streams application => (a) 1, (b) 4, (c) 6, (d) 7, (e) 11

The state stores offset of the last message affecting the state and the accumulated sum (including that message), which is used with a next message to compute the accumulated sum. That is after processing message (a) state becomes <a, 1>, after processing message (b) state becomes <b, 4> and so on.

Now let's assume following scenario:

1. Application starts consuming messages. Consumes first 3 messages
   (a), (b), and (c)
2. Application crashes. State store's changes were successfully written
   to its backing changelog topic, whereas writing to downstream topic
   failed. Hence:
     * Changelog topic {K: <c, 2>}
     * Downstream topic: []
3. Application resumes. State store gets restored and has value <c, 2>
4. Application starts consuming messages. Since no offset was
   committed, the application starts processing from message (a)
5. Message (a) is evaluated against the kept state. Because offset in
   the state (c) is larger than (a), there is not enough information to
   process the message (a), the message is discarded. The same happens
   to the message (b).
6. Message (c) is processed. As it has the same offset as the one in
   the state, we can determine the message, which should be written to
   the downstream topic, so we push message with value 6.
7. Messages (d) and (e) are processed normally.
8. The downstream topic now contains messages [(c) 6, (d) 7, (e) 11].

In this scenario we lost output of processing messages (a) and (b). However, if we are interested in the currently accumulated sum, we can still read it and hence losing messages did not change the final result.

If we really needed to, we could preserve state as a map <K, map<offset, state>>. This way the processing would be idempotent and we could still produce outcome for messages (a) and (b). It adds however extra complexity - we need to maintain the map over time by deleting entries older than committed offset.

What do you think Matthias?

Kind Regards
Krzysztof Lesniewski

On 03.02.2017 20:02, Matthias J. Sax wrote:
Answers inline.

-Matthias


On 2/3/17 7:37 AM, Krzysztof Lesniewski, Nexiot AG wrote:
Thank you Eno for the information on KIP-98. Making downstream topic and
state store's changelog writes atomic would do simplify the problem. I
did not dive into the design, so I am not able to tell if it would bring
other implications, but as KIP-98 is so far under discussion, I have to
settle on some other solution. It is valuable you brought it though.

Matthias, from what you wrote I understand, that even if state gets
flushed to a local state store (e.g. RocksDB), but it does not reach
underlying changelog before the crash, after restarting the application,
the state gets restored from changelong and hence successful flush to
local state store does not matter in that case, as the local state store
will be replaced with the restored one. Is that correct?
Yes. Exactly like this.


My previous post was based on an invalid assumption, that updates to
state (changelog topic) are flushed before updates to downstream topic
(that is updates to downstream topic are sent only after all updates to
changelog are acknowledged). I looked at the code and from what I
understood, updates to all the topics (changelog and downstream) are
sent at more or less the same time. I could not quickly determine
though, if it is possible (in case of failure) that a change to state
store's changelog was successfully written while a related change to
downstream topic was not written. As the writes are not atomic I would
assume there is such possibility, which in rare cases could cause losing
a message.
Yes and no.

Yes, it could happen, that a state change is flushed to changelog topic
but not to downstream result topic.

No, this would not result in a loss of data. Because the flush to
downstream result topic failed, no offset commit for the input record
will be done. And thus, the input record doing the state update would
get reprocessed, recomputing the "lost" downstream topic result (if
operation is idempotent).

For this case, if you detect a duplicate record (and to avoid data
loss), you will not update the list in <K,list<V>> with the duplicate
record, but you will need to recompute the aggregate result and emit it
downstream.

In case the result got flushed to changelog and result topic, you will
still apply the same strategy of not update state, but recompute
aggreate (because you cannot know which of both cases it true) and you
will write the same result a second time to the result topic and get an
duplicate (but that aligns with "at-least-once")

Nevertheless, in my use case such loss in rare circumstances is
acceptable and therefore extra complexity required to avoid it is
unnecessary. I will then go for the solution you have proposed with
storing <K,<Id, V>>. I would appreciate though if you could verify
whether what I wrote above is correct.

Kind Regards
Krzysztof Lesniewski

On 03.02.2017 01:06, Matthias J. Sax wrote:
You assumptions is not completely correct.

After a crash and State Store restore, the store will contain exactly
the same data as written to the underlying changelog. Thus, if you
update was buffered but never send, the store will not contain the
update after restore and thus the record will not be considered a
duplicate.


-Matthias

On 2/2/17 12:55 PM, Eno Thereska wrote:
Hi Krzysztof,

There are several scenarios where you want a set of records to be
sent atomically (to a statestore, downstream topics etc). In case of
failure then, either all of them commit successfully, or none does.
We are working to add exactly-once processing to Kafka Streams and I
suspect you'll find that useful. It will build on the KIP-98 that is
currently being discussed, on exactly once delivery and transactional
messaging.

Thanks
Eno

On 2 Feb 2017, at 20:01, Krzysztof Lesniewski, Nexiot AG
<krzysztof.lesniew...@nexiot.ch> wrote:

Thank you Matthias for your answer.

Of course, wherever it is possible I will use idempotent updates,
but unfortunately it does not apply to all my cases.

I though before about the alternative to idempotent updates you have
proposed, but I think it carries a risk of breaking at-least-once
delivery semantics in rare cases. Given that state store changelog
is flushed first, if an application crashes after flushing the state
store, but before flushing all the produced records, a reprocessed
record may be discarded (considered a duplicate), even though there
is a chance its processing result never reached the destination
topic. That is:

1. A record with offset (or other identifier) A is processed. State
    store updates the state and Id to A
2. On commit:
     1. state store changes are flushed
     2. server crashes: result of processing record A was buffered, but
        never sent
3. State store is restored
4. Record with offset A is reprocessed, but before its Id is already in
    the state store, it is considered a duplicate and discarded

An extension to your solution would be to additionally store a map
of states for past messages <K,<V, Id, Map<Id, V>>, and then if
offset in the state is greater than the current record's offset,
state valid for the current record can be retrieved from the map. Of
course it adds extra complexity, as the map has to be maintained to
not grow indefinitely. An easy solution to removing old records
would be to access committed offset and delete all entries before
it, but I did not find an easy way to access the committed offset.

Is my thinking correct here? How could I maintain such state store
and are there other gotchas I should pay attention to?

Kind Regards
*Krzysztof Lesniewski*

On 02.02.2017 18:59, Matthias J. Sax wrote:
Hi,

About message acks: writes will be acked, however async (not sync
as you
describe it). Only before an actual commit, KafkaProducer#flush is
called and all not-yet received acks are collected (ie, blocking/sync)
before the commit is done.

About state guarantees: there are none -- state might be between X
and Z
(that's what "at-least-once" can provide -- not more). Thus, it's
recommended to apply idempotent updates to the stores if possible.

As an alternative, you might also be able to add the latest "update
record" to the state itself to depulicate in failure case. For
example,
instead of putting <K,V> in the state, you put <K<V,Id>> with id
being a
record id that did the last modification on V. Thus, each time before
you update the state, you can check if you did already "add" the
current
record to the state (for example, you could use the record offset
as id)
-- if the offset in the state is not smaller than the current record's
offset, the current record is a duplicate.


-Matthias



On 2/2/17 6:38 AM, Krzysztof Lesniewski, Nexiot AG wrote:
Hello,

In multiple sources I read that Kafka Streams has at-least-once
delivery
semantics, meaning that in case of failure, a message can be
processed
more than once, but it will not be lost. It is achieved by committing
offset only after the message processing is completely finished
and all
intermediate state/data is flushed to a reliable storage. There are
however some details I want to verify, especially related to Kafka
Stream's State Store.

Let's take as an example a Kafka Streams application, which consumes
messages from topic A, processes the messages using state store S and
outputs the processing results to topic B. Ensuring at-least-once
delivery semantics would mean following order:

1. Message is processed
2. Updated state of state store S (update caused by applying
processed
     message) is sent to backing changelog topic *and acknowledged*
3. Processing result is sent to topic B *and acknowledged*
4. Offset of topic A is commited (may be deferred)

Acknowledging is necessary, as otherwise offset could be
committed, even
though messages or state were not successfully submitted, e.g.
because
of batching or because there are less available replicas than
/min.insync.replicas/ and messages are being buffered. Am I
correct here?

Then, regarding state store, what state will I end up with after a
failure? Let's assume following scenario:

1. I consume message K, and process it in context of state X.
     Processing the message resulted in changing the state from X
to Y.
2. I consume the next message, message L, and process it in
context of
     state Y. Processing the message resulted in changing the state
from
     Y to Z.
3. Server crashed. As no offset was committed, I will start
processing
     again from message K.

What state will I end up with when processing again the message K.
Are
there any guaranties that the state store will give me state X, or
will
it be any state between X and Z? This is very important, as
reprocessing
the message may return different results when evaluating in a
context of
a different state.



Reply via email to