Ayoub Omari created KAFKA-16925: ----------------------------------- Summary: stream-table join does not immediately forward expired records on restart Key: KAFKA-16925 URL: https://issues.apache.org/jira/browse/KAFKA-16925 Project: Kafka Issue Type: Bug Components: streams Reporter: Ayoub Omari Assignee: Ayoub Omari
[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join] introduced grace period for KStreamKTableJoin. This allows to join a stream to a KTable backed by a Versioned state store. Upon receiving a record, it is put in a buffer until grace period is elapsed. When the grace period elapses, the record is joined with its most recent match from the versioned state store. +Late records+ are +not+ put in the buffer and are immediately joined. {code:java} If the grace period is non zero, the record will enter a stream buffer and will dequeue when the record timestamp is less than or equal to stream time minus the grace period. Late records, out of the grace period, will be executed right as they come in. (KIP-923){code} However, this is not the case today on rebalance or restart. The reason is that observedStreamTime is maintained within a variable which is lost on rebalance/restart: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java#L54] If the task restarts and receives an expired record, it considers that it has the maximum stream time observed so far, and puts it in the buffer instead of immediately joining it. {*}Example{*}: * Grace period = 60s * KTable contains (key, rightValue) +Normal scenario+ {code:java} streamInput1 (key, value1) <--- time = T : put in buffer streamInput2 (key, value2) <--- time = T - 60s : immediately joined // streamTime = T{code} +Scenario with rebalance+ {code:java} streamInput1 (key, value1) <--- time = T : put in buffer // --- rebalance --- streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime = T - 60s{code} The processor should use currentStreamTime from Context instead. Which is recovered on restart. -- This message was sent by Atlassian Jira (v8.20.10#820010)