[ https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sophie Blee-Goldman updated KAFKA-7994: --------------------------------------- Description: We compute a per-partition partition-time as the maximum timestamp over all records processed so far. Before 2.3 this was used to determine the logical stream-time used to make decisions about processing out-of-order records or drop them if they are late (ie, timestamp < stream-time - grace-period). Preserving the stream-time is necessary to ensure deterministic results (see KAFKA-9368), and although the processor-time is now used instead of partition-time, preserving the partition-time is a first step towards improving the overall stream-time semantics. The partition-time is also used by the TimestampExtractor. It gets passed in to #extract and can be used to determine a rough timestamp estimate if the actual timestamp is missing, corrupt, etc. This means in the corner case where the next record to be processed after a rebalance/restart cannot have its actual timestamp determined, we have no idea way of coming up with a reasonable guess and the record will likely have to be dropped. A potential fix would be, to store latest observed partition-time in the metadata of committed offsets. This way, on restart/rebalance we can re-initialize partition-time correctly. was: We compute a per-partition partition-time as the maximum timestamp over all records processed so far. Furthermore, we use partition-time to compute stream-time for each task as maximum over all partition-times (for all corresponding task partitions). This stream-time is used to make decisions about processing out-of-order records or drop them if they are late (ie, timestamp < stream-time - grace-period). During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, -1) for tasks that are newly created (or migrated). In net effect, we forget current stream-time for this case what may lead to non-deterministic behavior if we stop processing right before a late record, that would be dropped if we continue processing, but is not dropped after rebalance/restart. Let's look at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and the following records (timestamps in parenthesis): {code:java} r1(0) r2(5) r3(11) r4(2){code} In the example, stream-time advances as 0, 5, 11, 11 and thus record `r4` is dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or rebalance after processing `r3` but before processing `r4`, we would reinitialize stream-time as -1, and thus would process `r4` on restart/after rebalance. The problem is, that stream-time does advance differently from a global point of view: 0, 5, 11, 2. Note, this is a corner case, because if we would stop processing one record earlier, ie, after processing `r2` but before processing `r3`, stream-time would be advance correctly from a global point of view. A potential fix would be, to store latest observed partition-time in the metadata of committed offsets. Thus way, on restart/rebalance we can re-initialize time correctly. > Improve Stream-Time for rebalances and restarts > ----------------------------------------------- > > Key: KAFKA-7994 > URL: https://issues.apache.org/jira/browse/KAFKA-7994 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Matthias J. Sax > Assignee: Richard Yu > Priority: Major > Fix For: 2.4.0 > > Attachments: possible-patch.diff > > > We compute a per-partition partition-time as the maximum timestamp over all > records processed so far. Before 2.3 this was used to determine the logical > stream-time used to make decisions about processing out-of-order records or > drop them if they are late (ie, timestamp < stream-time - grace-period). > Preserving the stream-time is necessary to ensure deterministic results (see > KAFKA-9368), and although the processor-time is now used instead of > partition-time, preserving the partition-time is a first step towards > improving the overall stream-time semantics. > The partition-time is also used by the TimestampExtractor. It gets passed in > to #extract and can be used to determine a rough timestamp estimate if the > actual timestamp is missing, corrupt, etc. This means in the corner case > where the next record to be processed after a rebalance/restart cannot have > its actual timestamp determined, we have no idea way of coming up with a > reasonable guess and the record will likely have to be dropped. > > A potential fix would be, to store latest observed partition-time in the > metadata of committed offsets. This way, on restart/rebalance we can > re-initialize partition-time correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)