Ayoub Omari created KAFKA-16925:
-----------------------------------

             Summary: stream-table join does not immediately forward expired 
records on restart
                 Key: KAFKA-16925
                 URL: https://issues.apache.org/jira/browse/KAFKA-16925
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Ayoub Omari
            Assignee: Ayoub Omari


[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
 introduced grace period for KStreamKTableJoin. This allows to join a stream to 
a KTable backed by a Versioned state store.

Upon receiving a record, it is put in a buffer until grace period is elapsed. 
When the grace period elapses, the record is joined with its most recent match 
from the versioned state store.

+Late records+ are +not+ put in the buffer and are immediately joined.

 
{code:java}
If the grace period is non zero, the record will enter a stream buffer and will 
dequeue when the record timestamp is less than or equal to stream time minus 
the grace period.  Late records, out of the grace period, will be executed 
right as they come in. (KIP-923){code}
 

However, this is not the case today on rebalance or restart. The reason is that 
observedStreamTime is maintained within a variable which is lost on 
rebalance/restart: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java#L54]

 

If the task restarts and receives an expired record, it considers that it has 
the maximum stream time observed so far, and puts it in the buffer instead of 
immediately joining it.

 

{*}Example{*}:
 * Grace period = 60s
 * KTable contains (key, rightValue)

+Normal scenario+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
streamTime = T{code}
+Scenario with rebalance+

 
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

// --- rebalance ---

streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime = 
T - 60s{code}
 

The processor should use currentStreamTime from Context instead. Which is 
recovered on restart.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to