Rhishikesh Joshi created KAFKA-18069:
----------------------------------------

             Summary: Out of order output from windowed aggregations
                 Key: KAFKA-18069
                 URL: https://issues.apache.org/jira/browse/KAFKA-18069
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.9.0
         Environment: 1 Kafka broker running in docker.
Streams app written in Clojure using java lib version 3.9.0
            Reporter: Rhishikesh Joshi


We have an input topic which sends many messages to the topic every 250ms. (say 
10k). The key is unique id from a set of IDs. So every set of messages at the 
250ms mark will have some subset of the overall keys (product-ids etc)
We have a kafka streams app running windowed aggregations over it. Aggregations 
use grouping by the same key as the key of the input topic. And aggregate for 
10s windows over it, resulting in 1 value for each key for every 10 second 
window.


We are using suppression for this with an `untilWindowCloses` suppression so 
that we get only 1 value per grouping key for every aggregation window. 
We are not using any grace periods since our input topic is guaranteed to send 
messages in timestamp order (timestamp is a property of the event itself)
All topics have 3 partitions each
Say we have 2 application instances running these stream topologies. Number of 
stream threads is set to 3.
Now our problem seems to be, that in the event of a rebalance, say if one of 
the apps terminates or restarts for some other reason, we see out of order 
messages on the output topic on individual partitions
So for example, messages for the aggregation window for 10:00:00, seems to land 
up on the partition after the messages for window 10:00:20 etc.

We are using processing.guarantee = exactly_once_v2

 

Further,

Looks like there's an even simpler way to reproduce this issue.
If we just run 1 streams app instance with 3 stream threads (3 partitions for 
the input topic). Stop the streams app by killing it and then restart the same 
app, we start seeing out of order messages on the output topic. So window 
aggregations for an earlier window end up on the partition after a later window.
In our streams app, we do have an uncaught exception handler (a java one, not 
the streams uncaught handler) and we ensure that we close the streams cleanly 
in this case. And i can confirm that i do see the state go from pending 
shutdown to not running. So no unclean or uncommitted state anywhere i suppose.
Yet when i start the streams app back up, there's out-of-order messages on 
output.

 

Suggestion from Matthias Jax on slack
{quote}The only other thing you could try is, to move off suppress() and use 
windowedBy(...).emitStrategy(...).aggregate(...) instead
{quote}
 

Tried this but that also does not help, so maybe its not related just to 
suppression.

 

Seems like a pretty routine use-case is failing. or am i wrong in assuming that 
kafka streams can guarantee output ordering of windowed aggregations?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to