Hugo Ricateau created FLINK-37633: ------------------------------------- Summary: Improved state management for temporal join operator Key: FLINK-37633 URL: https://issues.apache.org/jira/browse/FLINK-37633 Project: Flink Issue Type: Improvement Components: Runtime / Configuration, Runtime / Queryable State, Stateful Functions, Table SQL / Runtime Affects Versions: 1.20.1, 1.19.2, 2.0.0 Reporter: Hugo Ricateau
When performing a temporal join between a left stream and a right versioned table, as the watermark of the join operator progresses, the deprecated entries of the right table (the updated rows whose end of validity precedes the operator watermark) are purged from the state; this is documented [here|https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/joins/#event-time-temporal-join], in the last paragraph (emphasis mine): {quote}In contrast to regular joins, the previous temporal table results will not be affected despite the changes on the build side. Compared to interval joins, temporal table joins do not define a time window within which the records will be joined. Records from the probe side are always joined with the build side’s version at the time specified by the time attribute. Thus, rows on the build side might be arbitrarily old. As time passes, {*}no longer needed versions of the record (for the given primary key) will be removed from the state{*}. {quote} However, while this feature is important to keep the state size under control, it imposes constraints that are not suitable for all use cases. In my use-case, on the one hand, I have a left stream that is subject to a strong disorder (events actually arrive in order, but their business validity time might be far in the past -- definitely being older than the event timestamp); on the other hand, the upsert right stream produces a versioned table, where each version of a row is valid from the update event time (up to the next update). As order does not matter on the left stream, and as we would like to release its events as soon as possible, we declared the watermark as {{{}WATERMARK FOR <business-validity-time> AS <event-time>{}}}. But since the state of the join operator does not retain "deprecated" versions of the rows in the right table, some events on the left stream are conversely unable to find the appropriate right-side row to join with, due to their “old” business validity time. David Anderson suggested the following workaround to deal with such situations: fixing the watermark of the right stream to a constant far in the future timestamp ensures that no version of a row of the right table is ever purged from the state (as the watermark of the join operator will always be way behind the watermark of the events of the right stream). As a result, the watermark of the join operator follows the one of the left stream, releasing its events as they arrive. However there are two severe drawbacks to this solution: * As the watermark on the left stream has been fixed far in the future, it breaks the ability to preserve order between the events of the left and right streams (e.g. they arise from a CDC connector and insert operations on the right table necessarily precedes those on the left table to fulfil foreign keys constraints). Another workaround is to introduce an artificial delay on the left stream, to increase the likelihood that right-side events will be processed before the corresponding left-side ones; but this offers no guarantees and introduces undesirable lag in the pipeline. * Moreover, this all-or-nothing strategy on the retention policy of "deprecated" versions of the right table, clearly lacks a more fine-grained control like purging versions that are deprecated since a configurable grace period. I would like suggesting {*}adding a configuration value that allows delaying removal of deprecated versions of a record from the state{*}. To the extent that you agree with my improvement proposal, I'm volunteer to implement it; here is a draft of how I would do it: * Add a {{long outdatedRetentionTime}} parameter to the {{TemporalRowTimeJoinOperator}} class [constructor|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L126-L143]; * Use this interval to delay the passed {{currentWatermark}} in [this call|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L273] to {{cleanupExpiredVersionInState}} (a negative delay leading to completely disabling the feature); * Add a {{table.exec.state.outdated-versions.retention}} configuration value [here|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java#L60]; * And pass it to the {{TemporalRowTimeJoinOperator}} [constructor|https://github.com/apache/flink/blob/5aaff07fe925b289a2cabe9269a015ebc2255223/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java#L268-L276], following the same pattern as for {{{}minRetentionTime{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)