[ 
https://issues.apache.org/jira/browse/KAFKA-18069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17906243#comment-17906243
 ] 

Khoa Nguyen commented on KAFKA-18069:
-------------------------------------

Hi [~rhishikeshj], can I take this one?

> Out of order output from windowed aggregations
> ----------------------------------------------
>
>                 Key: KAFKA-18069
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18069
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.9.0
>         Environment: 1 Kafka broker running in docker.
> Streams app written in Clojure using java lib version 3.9.0
>            Reporter: Rhishikesh Joshi
>            Priority: Major
>
> We have an input topic which sends many messages to the topic every 250ms. 
> (say 10k). The key is unique id from a set of IDs. So every set of messages 
> at the 250ms mark will have some subset of the overall keys (product-ids etc)
> We have a kafka streams app running windowed aggregations over it. 
> Aggregations use grouping by the same key as the key of the input topic. And 
> aggregate for 10s windows over it, resulting in 1 value for each key for 
> every 10 second window.
> We are using suppression for this with an `untilWindowCloses` suppression so 
> that we get only 1 value per grouping key for every aggregation window. 
> We are not using any grace periods since our input topic is guaranteed to 
> send messages in timestamp order (timestamp is a property of the event itself)
> All topics have 3 partitions each
> Say we have 2 application instances running these stream topologies. Number 
> of stream threads is set to 3.
> Now our problem seems to be, that in the event of a rebalance, say if one of 
> the apps terminates or restarts for some other reason, we see out of order 
> messages on the output topic on individual partitions
> So for example, messages for the aggregation window for 10:00:00, seems to 
> land up on the partition after the messages for window 10:00:20 etc.
> We are using processing.guarantee = exactly_once_v2
>  
> Further,
> Looks like there's an even simpler way to reproduce this issue.
> If we just run 1 streams app instance with 3 stream threads (3 partitions for 
> the input topic). Stop the streams app by killing it and then restart the 
> same app, we start seeing out of order messages on the output topic. So 
> window aggregations for an earlier window end up on the partition after a 
> later window.
> In our streams app, we do have an uncaught exception handler (a java one, not 
> the streams uncaught handler) and we ensure that we close the streams cleanly 
> in this case. And i can confirm that i do see the state go from pending 
> shutdown to not running. So no unclean or uncommitted state anywhere i 
> suppose.
> Yet when i start the streams app back up, there's out-of-order messages on 
> output.
>  
> Suggestion from Matthias Sax on slack
> {quote}The only other thing you could try is, to move off suppress() and use 
> windowedBy(...).emitStrategy(...).aggregate(...) instead
> {quote}
>  
> Tried this but that also does not help, so maybe its not related just to 
> suppression.
>  
> Seems like a pretty routine use-case is failing. or am i wrong in assuming 
> that kafka streams can guarantee output ordering of windowed aggregations?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to