[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-7994:
---------------------------------------
    Description: 
We compute a per-partition partition-time as the maximum timestamp over all 
records processed so far. Before 2.3 this was used to determine the logical 
stream-time used to make decisions about processing out-of-order records or 
drop them if they are late (ie, timestamp < stream-time - grace-period). 
Preserving the stream-time is necessary to ensure deterministic results (see 
KAFKA-9368), and although the processor-time is now used instead of 
partition-time, preserving the partition-time is a first step towards improving 
the overall stream-time semantics.

The partition-time is also used by the TimestampExtractor. It gets passed in to 
#extract and can be used to determine a rough timestamp estimate if the actual 
timestamp is missing, corrupt, etc. This means in the corner case where the 
next record to be processed after a rebalance/restart cannot have its actual 
timestamp determined, we have no idea way of coming up with a reasonable guess 
and the record will likely have to be dropped.

 

A potential fix would be, to store latest observed partition-time in the 
metadata of committed offsets. This way, on restart/rebalance we can 
re-initialize partition-time correctly.

  was:
We compute a per-partition partition-time as the maximum timestamp over all 
records processed so far. Furthermore, we use partition-time to compute 
stream-time for each task as maximum over all partition-times (for all 
corresponding task partitions). This stream-time is used to make decisions 
about processing out-of-order records or drop them if they are late (ie, 
timestamp < stream-time - grace-period).

During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, -1) 
for tasks that are newly created (or migrated). In net effect, we forget 
current stream-time for this case what may lead to non-deterministic behavior 
if we stop processing right before a late record, that would be dropped if we 
continue processing, but is not dropped after rebalance/restart. Let's look at 
an examples with a grade period of 5ms for a tumbling windowed of 5ms, and the 
following records (timestamps in parenthesis):

 
{code:java}
r1(0) r2(5) r3(11) r4(2){code}
In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
rebalance after processing `r3` but before processing `r4`, we would 
reinitialize stream-time as -1, and thus would process `r4` on restart/after 
rebalance. The problem is, that stream-time does advance differently from a 
global point of view: 0, 5, 11, 2.

Note, this is a corner case, because if we would stop processing one record 
earlier, ie, after processing `r2` but before processing `r3`, stream-time 
would be advance correctly from a global point of view.

A potential fix would be, to store latest observed partition-time in the 
metadata of committed offsets. Thus way, on restart/rebalance we can 
re-initialize time correctly.


> Improve Stream-Time for rebalances and restarts
> -----------------------------------------------
>
>                 Key: KAFKA-7994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7994
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Richard Yu
>            Priority: Major
>             Fix For: 2.4.0
>
>         Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Before 2.3 this was used to determine the logical 
> stream-time used to make decisions about processing out-of-order records or 
> drop them if they are late (ie, timestamp < stream-time - grace-period). 
> Preserving the stream-time is necessary to ensure deterministic results (see 
> KAFKA-9368), and although the processor-time is now used instead of 
> partition-time, preserving the partition-time is a first step towards 
> improving the overall stream-time semantics.
> The partition-time is also used by the TimestampExtractor. It gets passed in 
> to #extract and can be used to determine a rough timestamp estimate if the 
> actual timestamp is missing, corrupt, etc. This means in the corner case 
> where the next record to be processed after a rebalance/restart cannot have 
> its actual timestamp determined, we have no idea way of coming up with a 
> reasonable guess and the record will likely have to be dropped.
>  
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. This way, on restart/rebalance we can 
> re-initialize partition-time correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to