[ 
https://issues.apache.org/jira/browse/KAFKA-9368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009121#comment-17009121
 ] 

Sophie Blee-Goldman commented on KAFKA-9368:
--------------------------------------------

In KAFKA-7994 we fixed the partition-time for rebalances/restarts, but since 
stream-time is now determined on a per-processor basis and not by the 
partition-time we should track the issue separately. Thus I split the "preserve 
stream-time" aspect of KAFKA-7994 to a new ticket (see original ticket for full 
discussion).

> Preserve stream-time across rebalances/restarts
> -----------------------------------------------
>
>                 Key: KAFKA-9368
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9368
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Sophie Blee-Goldman
>            Priority: Major
>
> Stream-time is used to make decisions about processing out-of-order records 
> or drop them if they are late (ie, timestamp < stream-time - grace-period). 
> This is currently tracked on a per-processor basis such that each node has 
> its own local view of stream-time based on the maximum timestamp it has 
> processed.
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for all processors in tasks that are newly created (or migrated). In net 
> effect, we forget current stream-time for this case what may lead to 
> non-deterministic behavior if we stop processing right before a late record, 
> that would be dropped if we continue processing, but is not dropped after 
> rebalance/restart. Let's look at an examples with a grace period of 5ms for a 
> tumbling windowed of 5ms, and the following records (timestamps in 
> parenthesis):
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Of course, this is a corner case because if we would stop processing one 
> record earlier -- ie, after processing `r2` but before processing `r3` -- 
> stream-time would be advanced correctly from a global point of view. 
> Note that in previous versions the maximum partition-time was actually used 
> for stream-time. This changed in 2.3 due to KAFKA-7895/[PR 
> 6278|https://github.com/apache/kafka/pull/6278], and could potentially change 
> yet again in future versions (c.f. KAFKA-8769). Partition-time actually is 
> preserved as of 2.4 thanks to KAFKA-7994.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to