dawidwys commented on code in PR #26527: URL: https://github.com/apache/flink/pull/26527#discussion_r2075417209
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java: ########## @@ -67,14 +69,27 @@ public interface CallContext { /** * Returns information about the table that has been passed to a table argument. * - * <p>Semantics are only available for table arguments (i.e. arguments of a {@link - * ProcessTableFunction} that are annotated with {@code @ArgumentHint(TABLE_AS_SET)} or - * {@code @ArgumentHint(TABLE_AS_ROW)}). + * <p>This method applies only to {@link ProcessTableFunction}s. + * + * <p>Semantics are only available for table arguments that are annotated with + * {@code @ArgumentHint(TABLE_AS_SET)} or {@code @ArgumentHint(TABLE_AS_ROW)}). */ default Optional<TableSemantics> getTableSemantics(int pos) { return Optional.empty(); } + /** + * Returns the {@link ChangelogMode} that the framework requires from the function. + * + * <p>This method applies only to {@link ProcessTableFunction}s. Review Comment: ```suggestion * <p>This method applies only to {@link ProcessTableFunction ProcessTableFunctions}. ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java: ########## @@ -112,14 +109,18 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) private final List<ChangelogMode> inputChangelogModes; + @JsonProperty(FIELD_NAME_REQUIRED_CHANGELOG_MODE) + private final ChangelogMode requiredChangelogMode; Review Comment: nit: wondering if `outputChangelogMode` isn't easier to understand? ########## docs/content/docs/dev/table/functions/ptfs.md: ########## @@ -1093,6 +1093,359 @@ This allows the PTF to focus on the main aggregation without the need to manuall {{< top >}} +Updates and Changelogs +---------------------- + +By default, PTFs assume that table arguments are backed by append-only tables, where new records are inserted to the table +without any updates to existing records. PTFs then produce new append-only tables as output. + +While append-only tables are ideal and work seamlessly with event-time and watermarks, there are scenarios that require +working with updating tables. In these cases, records can be updated or deleted after their initial insertion. +This impacts several aspects: + +- **State Management**: Operations must accommodate the possibility that any record can be updated again, potentially requiring + a larger state footprint. +- **Pipeline Complexity**: Since records are not final and can be changed subsequently, the entire pipeline result remains + in-flight. +- **Downstream Systems**: In-flight data can lead to issues, not only in Flink but also in downstream systems where consistency + and finality of data are critical. + +{{< hint info >}} +For efficient and high-performance data processing, it is recommended to design pipelines using append-only tables whenever +feasible to simplify state management and avoid complexities associated with updating tables. +{{< /hint >}} + +A PTF can consume and/or produce updating tables if it is configured to do so. This section provides a brief overview of +CDC (Change Data Capture) with PTFs. + +### Change Data Capture Basics + +Under the hood, tables in Flink's SQL engine are backed by changelogs. These changelogs encode CDC (Change Data Capture) +information containing *INSERT* (`+I`), *UPDATE_BEFORE* (`-U`), *UPDATE_AFTER* (`+U`), or *DELETE* (`-D`) messages. + +The existence of these flags in the changelog constitutes the *Changelog Mode* of a consumer or producer: + +**Append Mode `{+I}`** +- All messages are insert-only. +- Every insertion message is an immutable fact. +- Messages can be distributed in an arbitrary fashion across partitions and processors because they are unrelated. + +**Upsert Mode `{+I, +U, -D}`** +- Messages can contain updates leading to an updating table. +- Updates are related using a key (i.e. the *upsert key*). +- Every message is either an upsert or delete message for a result under the upsert key. +- Messages for the same upsert key should land at the same partition and processor. +- Deletions can contain only values for upsert key columns (i.e. *partial deletes*) or values for + all columns (i.e. *full deletes*). +- The mode is also known as *partial image* in the literature because `-U` messages are missing. + +**Retract Mode `{+I, -U, +U, -D}`** +- Messages can contain updates leading to an updating table. +- Every insertion or update event is a fact that can be "undone" (i.e. retracted). +- Updates are related by all columns. In simplified words: The entire row is kind of the key but duplicates are supported. + For example: `+I['Bob', 42]` is related to `-D['Bob', 42]` and `+U['Alice', 13]` is related to `-U['Alice', 13]`. +- Thus, every message is either an insertion (`+`) or its retraction (`-`). +- The mode is known as *full image* in the literature. + +### Updating Input Tables + +The `ArgumentTrait.SUPPORTS_UPDATES` instructs the system that updates are allowed as input to the given table argument. +By default, a table argument is insert-only and updates will be rejected. + +Input tables become updating when sub queries such as aggregations or outer joins force an incremental computation. For +example, the following query only works if the function is able to digest retraction messages: + +```text +// The change +I[1] followed by -U[1], +U[2], -U[2], +U[3] will enter the function +WITH UpdatingTable AS ( + SELECT COUNT(*) FROM (VALUES 1, 2, 3) +) +SELECT * FROM f(table_arg => TABLE UpdatingTable) +``` + +If updates should be supported, ensure that the data type of the table argument is chosen in a way that it can encode +changes. In other words: choose a `Row` type that exposes the `RowKind` change flag. + +The changelog of the backing input table decides which kinds of changes enter the function. The function receives `{+I}` +when the input table is append-only. The function receives `{+I,+U,-D}` if the input table is upserting using the same +upsert key as the partition key. Otherwise, retractions `{+I,-U,+U,-D}` (i.e. including `RowKind.UPDATE_BEFORE`) enter +the function. Use `ArgumentTrait.REQUIRE_UPDATE_BEFORE` to enforce retractions for all updating cases. + +For upserting tables, if the changelog contains key-only deletions (also known as partial deletions), only upsert key +fields are set when a row enters the function. Non-key fields are set to `null`, regardless of `NOT NULL` constraints. +Use `ArgumentTrait.REQUIRE_FULL_DELETE` to enforce that only full deletes enter the function. + +The `SUPPORTS_UPDATES` trait is intended for advanced use cases. Please note that inputs are always insert-only in batch +mode. Thus, if the PTF should produce the same results in both batch and streaming mode, results should be emitted based +on watermarks and event-time. + +#### Enforcing Retract Mode + +The `ArgumentTrait.REQUIRE_UPDATE_BEFORE` instructs the system that a table argument which `SUPPORT_UPDATES` should include +a `RowKind.UPDATE_BEFORE` message when encoding updates. In other words: it enforces presenting the updating table in +retract changelog mode. + +By default, updates are encoded as emitted by the input operation. Thus, the updating table might be encoded in upsert +changelog mode and deletes might only contain keys. + +The following example shows how the input changelog encodes updates differently: + +```text +// Given a table UpdatingTable(name STRING PRIMARY KEY, score INT) +// backed by upsert changelog with changes +// +I[Alice, 42], +I[Bob, 0], +U[Bob, 2], +U[Bob, 100], -D[Bob, NULL]. + +SELECT * FROM f(table_arg => TABLE UpdatingTable PARTITION BY name) + +// The following changes will enter the function: +// +I[Alice, 42], +I[Bob, 0], -U[Bob, 0], +U[Bob, 2], -U[Bob, 2], +U[Bob, 100], -U[Bob, 100] + +// In both encodings, a materialized table would only contain a row for Alice. +``` + +#### Enforcing Upserts with Full Deletes + +The `ArgumentTrait.REQUIRE_FULL_DELETE` instructs the system that a table argument which `SUPPORT_UPDATES` should include +all fields in the `RowKind.DELETE` message if the updating table is backed by an upsert changelog. + +For upserting tables, if the changelog contains key-only deletes (also known as partial deletes), only upsert key fields +are set when a row enters the function. Non-key fields are set to null, regardless of NOT NULL constraints. + +The following example shows how the input changelog encodes updates differently: + +```text +// Given a table UpdatingTable(name STRING PRIMARY KEY, score INT) +// backed by upsert changelog with changes +// +I[Alice, 42], +I[Bob, 0], +U[Bob, 2], +U[Bob, 100], -D[Bob, NULL]. + +SELECT * FROM f(table_arg => TABLE UpdatingTable PARTITION BY name) Review Comment: nit: maybe mention in a comment the ptf `f` has `REQUIRE_FULL_DELETE` trait examples are often read without reading the context ########## docs/content/docs/dev/table/functions/ptfs.md: ########## @@ -1093,6 +1093,359 @@ This allows the PTF to focus on the main aggregation without the need to manuall {{< top >}} +Updates and Changelogs +---------------------- + +By default, PTFs assume that table arguments are backed by append-only tables, where new records are inserted to the table +without any updates to existing records. PTFs then produce new append-only tables as output. + +While append-only tables are ideal and work seamlessly with event-time and watermarks, there are scenarios that require +working with updating tables. In these cases, records can be updated or deleted after their initial insertion. +This impacts several aspects: + +- **State Management**: Operations must accommodate the possibility that any record can be updated again, potentially requiring + a larger state footprint. +- **Pipeline Complexity**: Since records are not final and can be changed subsequently, the entire pipeline result remains + in-flight. +- **Downstream Systems**: In-flight data can lead to issues, not only in Flink but also in downstream systems where consistency + and finality of data are critical. + +{{< hint info >}} +For efficient and high-performance data processing, it is recommended to design pipelines using append-only tables whenever +feasible to simplify state management and avoid complexities associated with updating tables. +{{< /hint >}} + +A PTF can consume and/or produce updating tables if it is configured to do so. This section provides a brief overview of +CDC (Change Data Capture) with PTFs. + +### Change Data Capture Basics + +Under the hood, tables in Flink's SQL engine are backed by changelogs. These changelogs encode CDC (Change Data Capture) +information containing *INSERT* (`+I`), *UPDATE_BEFORE* (`-U`), *UPDATE_AFTER* (`+U`), or *DELETE* (`-D`) messages. + +The existence of these flags in the changelog constitutes the *Changelog Mode* of a consumer or producer: + +**Append Mode `{+I}`** +- All messages are insert-only. +- Every insertion message is an immutable fact. +- Messages can be distributed in an arbitrary fashion across partitions and processors because they are unrelated. + +**Upsert Mode `{+I, +U, -D}`** +- Messages can contain updates leading to an updating table. +- Updates are related using a key (i.e. the *upsert key*). +- Every message is either an upsert or delete message for a result under the upsert key. +- Messages for the same upsert key should land at the same partition and processor. +- Deletions can contain only values for upsert key columns (i.e. *partial deletes*) or values for + all columns (i.e. *full deletes*). +- The mode is also known as *partial image* in the literature because `-U` messages are missing. + +**Retract Mode `{+I, -U, +U, -D}`** +- Messages can contain updates leading to an updating table. +- Every insertion or update event is a fact that can be "undone" (i.e. retracted). +- Updates are related by all columns. In simplified words: The entire row is kind of the key but duplicates are supported. + For example: `+I['Bob', 42]` is related to `-D['Bob', 42]` and `+U['Alice', 13]` is related to `-U['Alice', 13]`. +- Thus, every message is either an insertion (`+`) or its retraction (`-`). +- The mode is known as *full image* in the literature. + +### Updating Input Tables + +The `ArgumentTrait.SUPPORTS_UPDATES` instructs the system that updates are allowed as input to the given table argument. +By default, a table argument is insert-only and updates will be rejected. + +Input tables become updating when sub queries such as aggregations or outer joins force an incremental computation. For +example, the following query only works if the function is able to digest retraction messages: + +```text +// The change +I[1] followed by -U[1], +U[2], -U[2], +U[3] will enter the function +WITH UpdatingTable AS ( + SELECT COUNT(*) FROM (VALUES 1, 2, 3) +) +SELECT * FROM f(table_arg => TABLE UpdatingTable) +``` + +If updates should be supported, ensure that the data type of the table argument is chosen in a way that it can encode +changes. In other words: choose a `Row` type that exposes the `RowKind` change flag. + +The changelog of the backing input table decides which kinds of changes enter the function. The function receives `{+I}` +when the input table is append-only. The function receives `{+I,+U,-D}` if the input table is upserting using the same +upsert key as the partition key. Otherwise, retractions `{+I,-U,+U,-D}` (i.e. including `RowKind.UPDATE_BEFORE`) enter +the function. Use `ArgumentTrait.REQUIRE_UPDATE_BEFORE` to enforce retractions for all updating cases. + +For upserting tables, if the changelog contains key-only deletions (also known as partial deletions), only upsert key +fields are set when a row enters the function. Non-key fields are set to `null`, regardless of `NOT NULL` constraints. +Use `ArgumentTrait.REQUIRE_FULL_DELETE` to enforce that only full deletes enter the function. + +The `SUPPORTS_UPDATES` trait is intended for advanced use cases. Please note that inputs are always insert-only in batch +mode. Thus, if the PTF should produce the same results in both batch and streaming mode, results should be emitted based +on watermarks and event-time. + +#### Enforcing Retract Mode + +The `ArgumentTrait.REQUIRE_UPDATE_BEFORE` instructs the system that a table argument which `SUPPORT_UPDATES` should include +a `RowKind.UPDATE_BEFORE` message when encoding updates. In other words: it enforces presenting the updating table in +retract changelog mode. + +By default, updates are encoded as emitted by the input operation. Thus, the updating table might be encoded in upsert +changelog mode and deletes might only contain keys. + +The following example shows how the input changelog encodes updates differently: + +```text +// Given a table UpdatingTable(name STRING PRIMARY KEY, score INT) +// backed by upsert changelog with changes +// +I[Alice, 42], +I[Bob, 0], +U[Bob, 2], +U[Bob, 100], -D[Bob, NULL]. + +SELECT * FROM f(table_arg => TABLE UpdatingTable PARTITION BY name) + +// The following changes will enter the function: +// +I[Alice, 42], +I[Bob, 0], -U[Bob, 0], +U[Bob, 2], -U[Bob, 2], +U[Bob, 100], -U[Bob, 100] + +// In both encodings, a materialized table would only contain a row for Alice. +``` + +#### Enforcing Upserts with Full Deletes + +The `ArgumentTrait.REQUIRE_FULL_DELETE` instructs the system that a table argument which `SUPPORT_UPDATES` should include +all fields in the `RowKind.DELETE` message if the updating table is backed by an upsert changelog. + +For upserting tables, if the changelog contains key-only deletes (also known as partial deletes), only upsert key fields +are set when a row enters the function. Non-key fields are set to null, regardless of NOT NULL constraints. + +The following example shows how the input changelog encodes updates differently: + +```text +// Given a table UpdatingTable(name STRING PRIMARY KEY, score INT) +// backed by upsert changelog with changes +// +I[Alice, 42], +I[Bob, 0], +U[Bob, 2], +U[Bob, 100], -D[Bob, NULL]. + +SELECT * FROM f(table_arg => TABLE UpdatingTable PARTITION BY name) + +// The following changes will enter the function: +// +I[Alice, 42], +I[Bob, 0], +U[Bob, 2], +U[Bob, 100], -D[Bob, 100]. + +// In both encodings, a materialized table would only contain a row for Alice. +``` + +#### Example: Changelog Filtering + +The following function demonstrates how a PTF can transform an updating table into an append-only table. Instead of +applying updates encoded in each `Row`, it incorporates the changelog flag into the payload. The rows emitted by the PTF +are guaranteed to be of `RowKind.INSERT`. By preserving the original changelog flag in the payload, it permits filtering +of specific update types. In this example, it filters out all deletions. + +{{< tabs "1937eeed-3d13-455c-8e2f-5e164da9f844" >}} +{{< tab "Java" >}} +```java +TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + +Table data = env + .fromValues( + Row.of("Bob", 23), + Row.of("Alice", 42), + Row.of("Alice", 2)) + .as("name", "score"); + +// Since the aggregation is not windowed and potentially unbounded, +// the result is an updating table. Usually, this means that all following +// operations and sinks need to support updates. +Table aggregated = data + .groupBy($("name")) + .select($("name"), $("score").sum().as("sum")); + +// However, the PTF will convert the updating table into an insert-only result. +// Subsequent operations and sinks can easily digest the resulting table. +Table changelog = aggregated + .partitionBy($("name")) + .process(ToChangelogFunction.class); + +// For event-driven applications, filtering on certain CDC events is possible. +Table upsertsOnly = changelog.filter($("flag").in("INSERT", "UPDATE_AFTER")); + +upsertsOnly.execute().print(); + +// -------------------- +// Function declaration +// -------------------- + +@DataTypeHint("ROW<flag STRING, sum INT>") +public static class ToChangelogFunction extends ProcessTableFunction<Row> { + public void eval(@ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row input) { + // Forwards the sum column and includes the row's kind as a string column. + Row changelogRow = + Row.of( + input.getKind().toString(), + input.getField("sum")); + + collect(changelogRow); + } +} +``` +{{< /tab >}} +{{< /tabs >}} + +The PTF produces the following output when debugging in a console. The `op` section indicates that the result is append-only. The +original flag is encoded in the `flag` column. + +```text ++----+--------------------------------+--------------------------------+-------------+ +| op | name | flag | sum | ++----+--------------------------------+--------------------------------+-------------+ +| +I | Bob | INSERT | 23 | +| +I | Alice | INSERT | 42 | +| +I | Alice | UPDATE_AFTER | 44 | ++----+--------------------------------+--------------------------------+-------------+ +``` + +#### Limitations +- The `ArgumentTrait.PASS_COLUMNS_THROUGH` is not supported if `ArgumentTrait.SUPPORTS_UPDATES` is declared. +- The `on_time` argument is not supported if the PTF receives updates. + +### Updating Function Output + +The `ChangelogFunction` interface makes it possible for a function to declare the types of changes (e.g., inserts, updates, +deletes) that it may emit, allowing the planner to make informed decisions during query planning. + +{{< hint info >}} +The interface is intended for advanced use cases and should be implemented with care. Emitting an incorrect changelog +from the PTF may lead to undefined behavior in the overall query. +{{< /hint >}} + +The resulting changelog mode can be influenced by: +- The changelog mode of the input table arguments, accessible via `ChangelogContext.getTableChangelogMode(int)`. +- The changelog mode required by downstream operators, accessible via `ChangelogContext.getRequiredChangelogMode()`. + +Changelog mode inference in the planner involves several steps. The `getChangelogMode(ChangelogContext)` method is +called for each step: + +1. The planner checks whether the PTF emits updates or inserts-only. +2. If updates are emitted, the planner determines whether the updates include {@link + RowKind#UPDATE_BEFORE} messages (retract mode), or whether {@link RowKind#UPDATE_AFTER} + messages are sufficient (upsert mode). For this, {@link #getChangelogMode} might be called + twice to query both retract mode and upsert mode capabilities as indicated by {@link + ChangelogContext#getRequiredChangelogMode()}. +3. If in upsert mode, the planner checks whether {@link RowKind#DELETE} messages contain all + fields (full deletes) or only key fields (partial deletes). In the case of partial deletes, + only the upsert key fields are set when a row is removed; all non-key fields are null, + regardless of nullability constraints. {@link ChangelogContext#getRequiredChangelogMode()} + indicates whether a downstream operator requires full deletes. + +Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see `ArgumentTrait.TABLE_AS_SET`). +In case of upserts, the upsert key must be equal to the PARTITION BY key. + +It is perfectly valid for a `ChangelogFunction` implementation to return a fixed `ChangelogMode`, regardless of the +`ChangelogContext`. This approach may be appropriate when the PTF is designed for a specific scenario or pipeline setup, +and does not need to adapt dynamically to different input modes. Note that in such cases, the PTFs applicability is limited, +as it may only function correctly within the predefined context for which it was designed. + +In some cases, this interface should be used in combination with `SpecializedFunction` +to reconfigure the PTF after the final changelog mode for the specific call location has been +determined. The final changelog mode is also available during runtime via +`ProcessTableFunction.Context.getChangelogMode()`. + +#### Example: Custom Aggregation + +The following function demonstrates how a PTF can implement a aggregation function that is able to emit updates based Review Comment: ```suggestion The following function demonstrates how a PTF can implement an aggregation function that is able to emit updates based ``` ########## docs/content/docs/dev/table/functions/ptfs.md: ########## @@ -1093,6 +1093,359 @@ This allows the PTF to focus on the main aggregation without the need to manuall {{< top >}} +Updates and Changelogs +---------------------- + +By default, PTFs assume that table arguments are backed by append-only tables, where new records are inserted to the table +without any updates to existing records. PTFs then produce new append-only tables as output. + +While append-only tables are ideal and work seamlessly with event-time and watermarks, there are scenarios that require +working with updating tables. In these cases, records can be updated or deleted after their initial insertion. +This impacts several aspects: + +- **State Management**: Operations must accommodate the possibility that any record can be updated again, potentially requiring + a larger state footprint. +- **Pipeline Complexity**: Since records are not final and can be changed subsequently, the entire pipeline result remains + in-flight. +- **Downstream Systems**: In-flight data can lead to issues, not only in Flink but also in downstream systems where consistency + and finality of data are critical. + +{{< hint info >}} +For efficient and high-performance data processing, it is recommended to design pipelines using append-only tables whenever +feasible to simplify state management and avoid complexities associated with updating tables. +{{< /hint >}} + +A PTF can consume and/or produce updating tables if it is configured to do so. This section provides a brief overview of +CDC (Change Data Capture) with PTFs. + +### Change Data Capture Basics + +Under the hood, tables in Flink's SQL engine are backed by changelogs. These changelogs encode CDC (Change Data Capture) +information containing *INSERT* (`+I`), *UPDATE_BEFORE* (`-U`), *UPDATE_AFTER* (`+U`), or *DELETE* (`-D`) messages. + +The existence of these flags in the changelog constitutes the *Changelog Mode* of a consumer or producer: + +**Append Mode `{+I}`** +- All messages are insert-only. +- Every insertion message is an immutable fact. +- Messages can be distributed in an arbitrary fashion across partitions and processors because they are unrelated. + +**Upsert Mode `{+I, +U, -D}`** +- Messages can contain updates leading to an updating table. +- Updates are related using a key (i.e. the *upsert key*). +- Every message is either an upsert or delete message for a result under the upsert key. +- Messages for the same upsert key should land at the same partition and processor. +- Deletions can contain only values for upsert key columns (i.e. *partial deletes*) or values for + all columns (i.e. *full deletes*). +- The mode is also known as *partial image* in the literature because `-U` messages are missing. + +**Retract Mode `{+I, -U, +U, -D}`** +- Messages can contain updates leading to an updating table. +- Every insertion or update event is a fact that can be "undone" (i.e. retracted). +- Updates are related by all columns. In simplified words: The entire row is kind of the key but duplicates are supported. + For example: `+I['Bob', 42]` is related to `-D['Bob', 42]` and `+U['Alice', 13]` is related to `-U['Alice', 13]`. +- Thus, every message is either an insertion (`+`) or its retraction (`-`). +- The mode is known as *full image* in the literature. + +### Updating Input Tables + +The `ArgumentTrait.SUPPORTS_UPDATES` instructs the system that updates are allowed as input to the given table argument. +By default, a table argument is insert-only and updates will be rejected. + +Input tables become updating when sub queries such as aggregations or outer joins force an incremental computation. For +example, the following query only works if the function is able to digest retraction messages: + +```text +// The change +I[1] followed by -U[1], +U[2], -U[2], +U[3] will enter the function +WITH UpdatingTable AS ( + SELECT COUNT(*) FROM (VALUES 1, 2, 3) +) +SELECT * FROM f(table_arg => TABLE UpdatingTable) +``` + +If updates should be supported, ensure that the data type of the table argument is chosen in a way that it can encode +changes. In other words: choose a `Row` type that exposes the `RowKind` change flag. + +The changelog of the backing input table decides which kinds of changes enter the function. The function receives `{+I}` +when the input table is append-only. The function receives `{+I,+U,-D}` if the input table is upserting using the same +upsert key as the partition key. Otherwise, retractions `{+I,-U,+U,-D}` (i.e. including `RowKind.UPDATE_BEFORE`) enter +the function. Use `ArgumentTrait.REQUIRE_UPDATE_BEFORE` to enforce retractions for all updating cases. + +For upserting tables, if the changelog contains key-only deletions (also known as partial deletions), only upsert key +fields are set when a row enters the function. Non-key fields are set to `null`, regardless of `NOT NULL` constraints. +Use `ArgumentTrait.REQUIRE_FULL_DELETE` to enforce that only full deletes enter the function. + +The `SUPPORTS_UPDATES` trait is intended for advanced use cases. Please note that inputs are always insert-only in batch +mode. Thus, if the PTF should produce the same results in both batch and streaming mode, results should be emitted based +on watermarks and event-time. + +#### Enforcing Retract Mode + +The `ArgumentTrait.REQUIRE_UPDATE_BEFORE` instructs the system that a table argument which `SUPPORT_UPDATES` should include +a `RowKind.UPDATE_BEFORE` message when encoding updates. In other words: it enforces presenting the updating table in +retract changelog mode. + +By default, updates are encoded as emitted by the input operation. Thus, the updating table might be encoded in upsert +changelog mode and deletes might only contain keys. + +The following example shows how the input changelog encodes updates differently: + +```text +// Given a table UpdatingTable(name STRING PRIMARY KEY, score INT) +// backed by upsert changelog with changes +// +I[Alice, 42], +I[Bob, 0], +U[Bob, 2], +U[Bob, 100], -D[Bob, NULL]. + +SELECT * FROM f(table_arg => TABLE UpdatingTable PARTITION BY name) Review Comment: nit: you may mention in a comment that this ptf `f` uses `REQUIRE_UPDATE_BEFORE` and `REQUIRE_FULL_DELETE` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java: ########## @@ -67,14 +69,27 @@ public interface CallContext { /** * Returns information about the table that has been passed to a table argument. * - * <p>Semantics are only available for table arguments (i.e. arguments of a {@link - * ProcessTableFunction} that are annotated with {@code @ArgumentHint(TABLE_AS_SET)} or - * {@code @ArgumentHint(TABLE_AS_ROW)}). + * <p>This method applies only to {@link ProcessTableFunction}s. Review Comment: ```suggestion * <p>This method applies only to {@link ProcessTableFunction ProcessTableFunctions}. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org