[ https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854483#comment-17854483 ]
Ayoub Omari commented on KAFKA-16925: ------------------------------------- > Yes, but it's tracked on a per-task basis, and thus, it could be ahead of the > time the operator tracked My understanding is that the processor here wants to know the maximum observed stream time so far (including the current record), and context.currentStreamTimeMs() is set from the timestamp of input records. For me that's the information this processor is looking for ? I am not talking about general case, where I agree that it is better to put the logic in the store itself or any other way that is reusable in processors. For this particular case, the processor is maintaining a buffer, and it wants to evict it periodically. As it is the processor which commands the eviction and the buffer doesn't have an eviction policy itself (e.g. like window stores) I was thinking that it could make sense that the information is maintained within the processor rather than the buffer ? > stream-table join does not immediately forward expired records on restart > ------------------------------------------------------------------------- > > Key: KAFKA-16925 > URL: https://issues.apache.org/jira/browse/KAFKA-16925 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Ayoub Omari > Assignee: Ayoub Omari > Priority: Major > > [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join] > introduced grace period for KStreamKTableJoin. This allows to join a stream > to a KTable backed by a Versioned state store. > Upon receiving a record, it is put in a buffer until grace period is elapsed. > When the grace period elapses, the record is joined with its most recent > match from the versioned state store. > +Late records+ are +not+ put in the buffer and are immediately joined. > > {code:java} > If the grace period is non zero, the record will enter a stream buffer and > will dequeue when the record timestamp is less than or equal to stream time > minus the grace period. Late records, out of the grace period, will be > executed right as they come in. (KIP-923){code} > > However, this is not the case today on rebalance or restart. The reason is > that observedStreamTime is taken from the underlying state store which looses > this information on rebalance/restart: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164] > > If the task restarts and receives an expired record, the buffer considers > that this record has the maximum stream time observed so far, and puts it in > the buffer instead of immediately joining it. > > {*}Example{*}: > * Grace period = 60s > * KTable contains (key, rightValue) > > +Normal scenario+ > {code:java} > streamInput1 (key, value1) <--- time = T : put in buffer > streamInput2 (key, value2) <--- time = T - 60s : immediately joined // > streamTime = T{code} > > +Scenario with rebalance+ > {code:java} > streamInput1 (key, value1) <--- time = T : put in buffer > // --- rebalance --- > streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime > = T - 60s{code} > > The processor should use currentStreamTime from Context instead. Which is > recovered on restart. > -- This message was sent by Atlassian Jira (v8.20.10#820010)