Hugo Ricateau created FLINK-37633:
-------------------------------------

             Summary: Improved state management for temporal join operator
                 Key: FLINK-37633
                 URL: https://issues.apache.org/jira/browse/FLINK-37633
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Configuration, Runtime / Queryable State, 
Stateful Functions, Table SQL / Runtime
    Affects Versions: 1.20.1, 1.19.2, 2.0.0
            Reporter: Hugo Ricateau


When performing a temporal join between a left stream and a right versioned 
table, as the watermark of the join operator progresses, the deprecated entries 
of the right table (the updated rows whose end of validity precedes the 
operator watermark) are purged from the state; this is documented 
[here|https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/joins/#event-time-temporal-join],
 in the last paragraph (emphasis mine):
{quote}In contrast to regular joins, the previous temporal table results will 
not be affected despite the changes on the build side. Compared to interval 
joins, temporal table joins do not define a time window within which the 
records will be joined. Records from the probe side are always joined with the 
build side’s version at the time specified by the time attribute. Thus, rows on 
the build side might be arbitrarily old. As time passes, {*}no longer needed 
versions of the record (for the given primary key) will be removed from the 
state{*}.
{quote}
However, while this feature is important to keep the state size under control, 
it imposes constraints that are not suitable for all use cases.

In my use-case, on the one hand, I have a left stream that is subject to a 
strong disorder (events actually arrive in order, but their business validity 
time might be far in the past -- definitely being older than the event 
timestamp); on the other hand, the upsert right stream produces a versioned 
table, where each version of a row is valid from the update event time (up to 
the next update).
As order does not matter on the left stream, and as we would like to release 
its events as soon as possible, we declared the watermark as {{{}WATERMARK FOR 
<business-validity-time> AS <event-time>{}}}. But since the state of the join 
operator does not retain "deprecated" versions of the rows in the right table, 
some events on the left stream are conversely unable to find the appropriate 
right-side row to join with, due to their “old” business validity time.

David Anderson suggested the following workaround to deal with such situations: 
fixing the watermark of the right stream to a constant far in the future 
timestamp ensures that no version of a row of the right table is ever purged 
from the state (as the watermark of the join operator will always be way behind 
the watermark of the events of the right stream). As a result, the watermark of 
the join operator follows the one of the left stream, releasing its events as 
they arrive. However there are two severe drawbacks to this solution:
 * As the watermark on the left stream has been fixed far in the future, it 
breaks the ability to preserve order between the events of the left and right 
streams (e.g. they arise from a CDC connector and insert operations on the 
right table necessarily precedes those on the left table to fulfil foreign keys 
constraints). Another workaround is to introduce an artificial delay on the 
left stream, to increase the likelihood that right-side events will be 
processed before the corresponding left-side ones; but this offers no 
guarantees and introduces undesirable lag in the pipeline.
 * Moreover, this all-or-nothing strategy on the retention policy of 
"deprecated" versions of the right table, clearly lacks a more fine-grained 
control like purging versions that are deprecated since a configurable grace 
period.

I would like suggesting {*}adding a configuration value that allows delaying 
removal of deprecated versions of a record from the state{*}.

To the extent that you agree with my improvement proposal, I'm volunteer to 
implement it; here is a draft of how I would do it:
 * Add a {{long outdatedRetentionTime}} parameter to the 
{{TemporalRowTimeJoinOperator}} class 
[constructor|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L126-L143];
 * Use this interval to delay the passed {{currentWatermark}} in [this 
call|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L273]
 to {{cleanupExpiredVersionInState}} (a negative delay leading to completely 
disabling the feature);
 * Add a {{table.exec.state.outdated-versions.retention}} configuration value 
[here|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java#L60];
 * And pass it to the {{TemporalRowTimeJoinOperator}} 
[constructor|https://github.com/apache/flink/blob/5aaff07fe925b289a2cabe9269a015ebc2255223/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java#L268-L276],
 following the same pattern as for {{{}minRetentionTime{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to