Rhishikesh Joshi created KAFKA-18069: ----------------------------------------
Summary: Out of order output from windowed aggregations Key: KAFKA-18069 URL: https://issues.apache.org/jira/browse/KAFKA-18069 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.9.0 Environment: 1 Kafka broker running in docker. Streams app written in Clojure using java lib version 3.9.0 Reporter: Rhishikesh Joshi We have an input topic which sends many messages to the topic every 250ms. (say 10k). The key is unique id from a set of IDs. So every set of messages at the 250ms mark will have some subset of the overall keys (product-ids etc) We have a kafka streams app running windowed aggregations over it. Aggregations use grouping by the same key as the key of the input topic. And aggregate for 10s windows over it, resulting in 1 value for each key for every 10 second window. We are using suppression for this with an `untilWindowCloses` suppression so that we get only 1 value per grouping key for every aggregation window. We are not using any grace periods since our input topic is guaranteed to send messages in timestamp order (timestamp is a property of the event itself) All topics have 3 partitions each Say we have 2 application instances running these stream topologies. Number of stream threads is set to 3. Now our problem seems to be, that in the event of a rebalance, say if one of the apps terminates or restarts for some other reason, we see out of order messages on the output topic on individual partitions So for example, messages for the aggregation window for 10:00:00, seems to land up on the partition after the messages for window 10:00:20 etc. We are using processing.guarantee = exactly_once_v2 Further, Looks like there's an even simpler way to reproduce this issue. If we just run 1 streams app instance with 3 stream threads (3 partitions for the input topic). Stop the streams app by killing it and then restart the same app, we start seeing out of order messages on the output topic. So window aggregations for an earlier window end up on the partition after a later window. In our streams app, we do have an uncaught exception handler (a java one, not the streams uncaught handler) and we ensure that we close the streams cleanly in this case. And i can confirm that i do see the state go from pending shutdown to not running. So no unclean or uncommitted state anywhere i suppose. Yet when i start the streams app back up, there's out-of-order messages on output. Suggestion from Matthias Jax on slack {quote}The only other thing you could try is, to move off suppress() and use windowedBy(...).emitStrategy(...).aggregate(...) instead {quote} Tried this but that also does not help, so maybe its not related just to suppression. Seems like a pretty routine use-case is failing. or am i wrong in assuming that kafka streams can guarantee output ordering of windowed aggregations? -- This message was sent by Atlassian Jira (v8.20.10#820010)