Thanks guys. I knew that re-processing messages was a possibility with at_least_once processing, but I guess I hadn't considered the potential impact on the state stores as far as aggregations are concerned. So with exactly_once, it must roll-back commit(s) to the state store in a failure scenario? I haven't dug into the code to see how it works, but given the behavior I'm seeing it must..
Tim - I actually saw your related question from last week right after I sent mine. :) Alex On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <br...@confluent.io> wrote: > Hi Alex, > > what you describe about failing before offsets are committed is one > reason why records are processed multiple times under the > at-least-once processing guarantee. That is reality of life as you > stated. Kafka Streams in exactly-once mode guarantees that this > duplicate state updates do not happen. > > The exactly-once processing guarantee was implemented in Kafka Streams > for use cases where correctness is of highest importance. > > Best, > Bruno > > > > On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <brek...@gmail.com> wrote: > > > > Hi all, I have a (relatively) simple streams topology that is producing > > some counts, and while testing this code I'm seeing some occasional > > incorrect aggregation results. This seems to happen when a re-balance > > occurs - typically due to a timeout or communication hiccup with the > Kafka > > broker. The topology is built with the DSL, and utilizes 2 KTables: the > > first is really just a de-dup table and the second is the result of the > > aggregation. So at a high level the topology consumes from a source > topic, > > groupsByKey() and then does a reduce() where we always return the > > newValue. Then it does a groupBy() on a new key, and finally an > > aggregate() call with an adder and subtractor. Because our source topic > > frequently contains duplicate messages, this seemed like a good way to > > handle the dupes: the subtractor gets invoked anytime we replace a value > in > > the "upstream" KTable and removes it from the count, then adds it back > > again in the adder. > > > > In the happy-path scenario where we never see any exceptions or > rebalances, > > this whole thing works great - the counts at the end are 100% correct. > But > > when rebalancing is triggered we sometimes get bad counts. My theory is > > that when a timeout or connectivity problem happens during that second > > aggregation, the data ends up getting saved to the state store but the > > offsets don't get committed and the message(s) in the repartition topic > > that feed the aggregation get replayed after the stream task gets > > rebalanced, causing the counts to get incorrectly incremented or > > decremented. (depending on whether the message was triggering the adder > or > > the subtractor) I can simulate this problem (or something similar to > this > > problem) when debugging the application in my IDE just by pausing > execution > > on a breakpoint inside the aggregation's adder or subtractor method for a > > few seconds. The result of the adder or subtractor gets saved to the > state > > store which means that when the messages in the repartition topic get > > re-processed, the counts get doubled. If I enable "exactly_once" > > processing, I'm unable to recreate the problem and the counts are always > > accurate. > > > > My questions are: > > > > 1. Is this expected behavior? In a hostile application environment where > > connectivity problems and rebalances happen frequently, is some degree of > > incorrectly aggregated data just a reality of life? > > > > 2. Is exactly_once processing the right solution if correctness is of > > highest importance? Or should I be looking at different ways of writing > > the topology? > > > > Thanks for any advice! > > > > Alex >