[ https://issues.apache.org/jira/browse/KAFKA-18069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17906243#comment-17906243 ]
Khoa Nguyen commented on KAFKA-18069: ------------------------------------- Hi [~rhishikeshj], can I take this one? > Out of order output from windowed aggregations > ---------------------------------------------- > > Key: KAFKA-18069 > URL: https://issues.apache.org/jira/browse/KAFKA-18069 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.9.0 > Environment: 1 Kafka broker running in docker. > Streams app written in Clojure using java lib version 3.9.0 > Reporter: Rhishikesh Joshi > Priority: Major > > We have an input topic which sends many messages to the topic every 250ms. > (say 10k). The key is unique id from a set of IDs. So every set of messages > at the 250ms mark will have some subset of the overall keys (product-ids etc) > We have a kafka streams app running windowed aggregations over it. > Aggregations use grouping by the same key as the key of the input topic. And > aggregate for 10s windows over it, resulting in 1 value for each key for > every 10 second window. > We are using suppression for this with an `untilWindowCloses` suppression so > that we get only 1 value per grouping key for every aggregation window. > We are not using any grace periods since our input topic is guaranteed to > send messages in timestamp order (timestamp is a property of the event itself) > All topics have 3 partitions each > Say we have 2 application instances running these stream topologies. Number > of stream threads is set to 3. > Now our problem seems to be, that in the event of a rebalance, say if one of > the apps terminates or restarts for some other reason, we see out of order > messages on the output topic on individual partitions > So for example, messages for the aggregation window for 10:00:00, seems to > land up on the partition after the messages for window 10:00:20 etc. > We are using processing.guarantee = exactly_once_v2 > > Further, > Looks like there's an even simpler way to reproduce this issue. > If we just run 1 streams app instance with 3 stream threads (3 partitions for > the input topic). Stop the streams app by killing it and then restart the > same app, we start seeing out of order messages on the output topic. So > window aggregations for an earlier window end up on the partition after a > later window. > In our streams app, we do have an uncaught exception handler (a java one, not > the streams uncaught handler) and we ensure that we close the streams cleanly > in this case. And i can confirm that i do see the state go from pending > shutdown to not running. So no unclean or uncommitted state anywhere i > suppose. > Yet when i start the streams app back up, there's out-of-order messages on > output. > > Suggestion from Matthias Sax on slack > {quote}The only other thing you could try is, to move off suppress() and use > windowedBy(...).emitStrategy(...).aggregate(...) instead > {quote} > > Tried this but that also does not help, so maybe its not related just to > suppression. > > Seems like a pretty routine use-case is failing. or am i wrong in assuming > that kafka streams can guarantee output ordering of windowed aggregations? -- This message was sent by Atlassian Jira (v8.20.10#820010)