I asked an essentially similar question a week or two ago. The answer was "this is expected behaviour unless you switch on exactly-once processing".
(In my case it was solved by changing the topology, which I had to do for other, unconnected, reasons (the requirements for the application changed when I was part way through writing it).) Tim Ward -----Original Message----- From: Alex Brekken <brek...@gmail.com> Sent: 19 August 2019 20:24 To: users@kafka.apache.org Subject: Kafka Streams incorrect aggregation results when re-balancing occurs Hi all, I have a (relatively) simple streams topology that is producing some counts, and while testing this code I'm seeing some occasional incorrect aggregation results. This seems to happen when a re-balance occurs - typically due to a timeout or communication hiccup with the Kafka broker. The topology is built with the DSL, and utilizes 2 KTables: the first is really just a de-dup table and the second is the result of the aggregation. So at a high level the topology consumes from a source topic, groupsByKey() and then does a reduce() where we always return the newValue. Then it does a groupBy() on a new key, and finally an aggregate() call with an adder and subtractor. Because our source topic frequently contains duplicate messages, this seemed like a good way to handle the dupes: the subtractor gets invoked anytime we replace a value in the "upstream" KTable and removes it from the count, then adds it back again in the adder. In the happy-path scenario where we never see any exceptions or rebalances, this whole thing works great - the counts at the end are 100% correct. But when rebalancing is triggered we sometimes get bad counts. My theory is that when a timeout or connectivity problem happens during that second aggregation, the data ends up getting saved to the state store but the offsets don't get committed and the message(s) in the repartition topic that feed the aggregation get replayed after the stream task gets rebalanced, causing the counts to get incorrectly incremented or decremented. (depending on whether the message was triggering the adder or the subtractor) I can simulate this problem (or something similar to this problem) when debugging the application in my IDE just by pausing execution on a breakpoint inside the aggregation's adder or subtractor method for a few seconds. The result of the adder or subtractor gets saved to the state store which means that when the messages in the repartition topic get re-processed, the counts get doubled. If I enable "exactly_once" processing, I'm unable to recreate the problem and the counts are always accurate. My questions are: 1. Is this expected behavior? In a hostile application environment where connectivity problems and rebalances happen frequently, is some degree of incorrectly aggregated data just a reality of life? 2. Is exactly_once processing the right solution if correctness is of highest importance? Or should I be looking at different ways of writing the topology? Thanks for any advice! Alex This email is from Origami Energy Limited. The contents of this email and any attachment are confidential to the intended recipient(s). If you are not an intended recipient: (i) do not use, disclose, distribute, copy or publish this email or its contents; (ii) please contact Origami Energy Limited immediately; and then (iii) delete this email. For more information, our privacy policy is available here: https://origamienergy.com/privacy-policy/. Origami Energy Limited (company number 8619644) is a company registered in England with its registered office at Ashcombe Court, Woolsack Way, Godalming, GU7 1LQ.