[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853118#comment-16853118
 ] 

Mateusz Owczarek commented on KAFKA-7994:
-----------------------------------------

Bruno's example above indicates it actually does affect `suppress` operator in 
terms of producing only 'final window result'. For me the keyword 'final' 
suggests that one and only one result will be produced, whereas one simple 
rebalancing may cause multiple results of the same window.

Only recently I've been using `suppress` operator in my application and found 
out this behavior, so I've created a pull request for my Kafka Streams example 
repository describing the steps to simply reproduce multiple `suppress` results:
https://github.com/mowczare/kafka-streams-scala/pull/1

> Improve Stream-Time for rebalances and restarts
> -----------------------------------------------
>
>                 Key: KAFKA-7994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7994
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Richard Yu
>            Priority: Major
>         Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to