gustavodemorais commented on code in PR #28164:
URL: https://github.com/apache/flink/pull/28164#discussion_r3258993550


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Upsert output

Review Comment:
   ```suggestion
   #### Upsert table
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Upsert output
+
+When `PARTITION BY` is combined with an `op_mapping` that does NOT include 
`UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition 
columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event 
with the partition key acting as the upsert key.
+
+```sql
+-- Upsert input: INSERT / UPDATE_AFTER / DELETE only
+-- +I[id:1, op:'INSERT',       name:'Alice']
+-- +I[id:2, op:'INSERT',       name:'Bob']
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
+-- +I[id:2, op:'DELETE',       name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id,
+  op_mapping => MAP[
+    'INSERT',       'INSERT',
+    'UPDATE_AFTER', 'UPDATE_AFTER',
+    'DELETE',       'DELETE']
+)
+
+-- Output (upsert changelog, upsert key = id):
+-- +I[id:1, name:'Alice']
+-- +I[id:2, name:'Bob']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+```
+
+Without `PARTITION BY`, or when the active `op_mapping` includes 
`UPDATE_BEFORE`, the output remains a retract changelog.

Review Comment:
   ```suggestion
   By default, without `PARTITION BY`, or when the active `op_mapping` includes 
`UPDATE_BEFORE`, the output remains a retract changelog.
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
      * <p>For row semantics (each row processed independently), use {@link 
Table#fromChangelog} on
      * the unpartitioned table.
      *
+     * <p>Output changelog mode:
+     *
+     * <ul>
+     *   <li><b>Retract</b> (default): the active {@code op_mapping} includes 
{@code UPDATE_BEFORE}
+     *       or no updates at all. The output emits {@code INSERT}, {@code 
UPDATE_BEFORE}, {@code
+     *       UPDATE_AFTER}, and {@code DELETE}.
+     *   <li><b>Upsert</b>: the {@code op_mapping} maps to {@code 
UPDATE_AFTER} without {@code
+     *       UPDATE_BEFORE}. The output emits {@code INSERT}, {@code 
UPDATE_AFTER}, and full {@code

Review Comment:
   ```suggestion
        *       UPDATE_BEFORE}. The output emits {@code INSERT}, {@code 
UPDATE_AFTER}, and {@code
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
 
+    // 
--------------------------------------------------------------------------------------------
+    // Changelog mode inference
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Emits an upsert changelog when the input is partitioned (set semantics) 
and the resolved
+     * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code 
UPDATE_BEFORE}. In all other
+     * cases the output is a retract changelog. When upsert mode is selected, 
the partition key acts
+     * as the upsert key.
+     *
+     * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean) 
upsert(false)})
+     * because the runtime forwards each input delete row with all fields 
populated; only the {@link
+     * org.apache.flink.types.RowKind} is rewritten.

Review Comment:
   ```suggestion
        * <p>Upsert mode uses full deletes by default ({@link 
ChangelogMode#upsert(boolean) upsert(false)}). We currently only support 
consuming from a changelog stream with full deletes (where not only the primary 
keys, but all fields fields are populated).
   ```
   
   nit: we could maybe even delete this and add only more info when we add 
"consume_full_updates"



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -182,10 +183,26 @@ public RelWriter explainTerms(RelWriter pw) {
         for (Ord<RelNode> ord : Ord.zip(inputs)) {
             pw.input("input#" + ord.i, ord.e);
         }
-        return pw.item("invocation", scan.getCall())
+        pw.item("invocation", scan.getCall())
                 .item("uid", uid)
                 .item("select", String.join(",", getRowType().getFieldNames()))
                 .item("rowType", getRowType());
+        final Set<ImmutableBitSet> upsertKeys =

Review Comment:
   I don't think you want to do this. This would change the plan output for all 
PTFs and you'd theoretically have to regenerate all PTF tests in the code base.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java:
##########
@@ -182,10 +183,26 @@ public RelWriter explainTerms(RelWriter pw) {
         for (Ord<RelNode> ord : Ord.zip(inputs)) {
             pw.input("input#" + ord.i, ord.e);
         }
-        return pw.item("invocation", scan.getCall())
+        pw.item("invocation", scan.getCall())
                 .item("uid", uid)
                 .item("select", String.join(",", getRowType().getFieldNames()))
                 .item("rowType", getRowType());
+        final Set<ImmutableBitSet> upsertKeys =

Review Comment:
   You just want to output the upsertKeys in the plan, right? Then you want to 
use/thinker your tests to use the testing utility that allows you do so instead 
of changing the default explain for ptf nodes
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
      * <p>For row semantics (each row processed independently), use {@link 
Table#fromChangelog} on
      * the unpartitioned table.
      *
+     * <p>Output changelog mode:
+     *
+     * <ul>
+     *   <li><b>Retract</b> (default): the active {@code op_mapping} includes 
{@code UPDATE_BEFORE}
+     *       or no updates at all. The output emits {@code INSERT}, {@code 
UPDATE_BEFORE}, {@code

Review Comment:
   ```suggestion
        *       or only {@code INSERT} and {@code DELETE} pairs. The output 
emits {@code INSERT}, {@code UPDATE_BEFORE}, {@code
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Upsert output
+
+When `PARTITION BY` is combined with an `op_mapping` that does NOT include 
`UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition 
columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event 
with the partition key acting as the upsert key.

Review Comment:
   ```suggestion
   To generate an `Upsert table`, the following requirements must be met:
   
   * **Key Partitioning:** You must use `PARTITION BY <key>`, where the 
partition key corresponds to the unique/primary key of the dataset.
   * **Op Mapping Configuration:** The `op_mapping` must include `UPDATE_AFTER` 
and must NOT include `UPDATE_BEFORE`. 
   
   **How it works:**
   The engine assumes that the keys provided in the `PARTITION BY` clause 
function as the unique upsert keys. The resulting output changelog becomes an 
upsert table keyed on these partition columns. Each incoming row is evaluated 
and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition 
key as the explicit upsert key. Therefore, if the incoming changelog contains 
unique keys (such as a primary key), they **must** be used in the `PARTITION 
BY` clause.
   
   The FROM_CHANGELOG PTF assumes events arrived ordered. If the source itself 
does not guarantee ordering for events for the same PARTITION BY keys, consider 
using using ORDER BY <link>.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
      * <p>For row semantics (each row processed independently), use {@link 
Table#fromChangelog} on
      * the unpartitioned table.
      *
+     * <p>Output changelog mode:
+     *
+     * <ul>
+     *   <li><b>Retract</b> (default): the active {@code op_mapping} includes 
{@code UPDATE_BEFORE}
+     *       or no updates at all. The output emits {@code INSERT}, {@code 
UPDATE_BEFORE}, {@code

Review Comment:
   ```suggestion
        *       or no updates at all. The output possibly emits {@code INSERT}, 
{@code UPDATE_BEFORE}, {@code
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -226,6 +226,18 @@ public interface PartitionedTable {
      * <p>For row semantics (each row processed independently), use {@link 
Table#fromChangelog} on
      * the unpartitioned table.
      *
+     * <p>Output changelog mode:
+     *
+     * <ul>
+     *   <li><b>Retract</b> (default): the active {@code op_mapping} includes 
{@code UPDATE_BEFORE}
+     *       or no updates at all. The output emits {@code INSERT}, {@code 
UPDATE_BEFORE}, {@code
+     *       UPDATE_AFTER}, and {@code DELETE}.
+     *   <li><b>Upsert</b>: the {@code op_mapping} maps to {@code 
UPDATE_AFTER} without {@code

Review Comment:
   ```suggestion
        *   <li><b>Upsert</b>: the {@code op_mapping} maps {@code UPDATE_AFTER} 
without {@code
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
 
+    // 
--------------------------------------------------------------------------------------------
+    // Changelog mode inference
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Emits an upsert changelog when the input is partitioned (set semantics) 
and the resolved
+     * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code 
UPDATE_BEFORE}. In all other
+     * cases the output is a retract changelog. When upsert mode is selected, 
the partition key acts
+     * as the upsert key.
+     *
+     * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean) 
upsert(false)})
+     * because the runtime forwards each input delete row with all fields 
populated; only the {@link
+     * org.apache.flink.types.RowKind} is rewritten.
+     */
+    public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+            ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) : 
ChangelogMode.all();
+
+    /**
+     * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert 
changelog: the input
+     * table is partitioned AND the resolved {@code op_mapping} contains 
{@code UPDATE_AFTER}
+     * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the 
mapping is absent or
+     * cannot be resolved as a literal, since the default mapping includes 
both (retract).

Review Comment:
   ```suggestion
        * cannot be resolved as a literal. The default mapping maps to retract 
table.
   ```
   
   



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +98,50 @@ public Optional<List<DataType>> inferInputTypes(
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
 
+    // 
--------------------------------------------------------------------------------------------
+    // Changelog mode inference
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Emits an upsert changelog when the input is partitioned (set semantics) 
and the resolved
+     * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code 
UPDATE_BEFORE}. In all other
+     * cases the output is a retract changelog. When upsert mode is selected, 
the partition key acts
+     * as the upsert key.
+     *
+     * <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean) 
upsert(false)})
+     * because the runtime forwards each input delete row with all fields 
populated; only the {@link
+     * org.apache.flink.types.RowKind} is rewritten.
+     */
+    public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+            ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) : 
ChangelogMode.all();

Review Comment:
   The whole change here  in this file is very nice and easy to read 🙂



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Upsert output

Review Comment:
   This is a fair point to keep this simple. I thought a bit and still think 
it'd be good to have the section. We have retract as the default mode. The 
changelog mode of a table is an user facing option which users are aware which 
users are often dealing with. So a section telling them what they have to pay 
attention to when they want to fabricate an upsert table makes sense imo
   
   



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -200,6 +200,69 @@ public class FromChangelogTestPrograms {
                                     + "input => TABLE cdc_stream PARTITION BY 
id)")
                     .build();
 
+    public static final TableTestProgram UPSERT_PARTITION_BY =
+            TableTestProgram.of(
+                            "from-changelog-upsert-partition-by",
+                            "PARTITION BY + op_mapping without UPDATE_BEFORE 
produces an "
+                                    + "upsert changelog keyed on the partition 
columns")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema("name STRING", "id INT", "op 
STRING")
+                                    .producedValues(
+                                            Row.of("Alice", 1, "INSERT"),
+                                            Row.of("Bob", 2, "INSERT"),
+                                            Row.of("Alice2", 1, 
"UPDATE_AFTER"),
+                                            Row.of("Bob", 2, "DELETE"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT PRIMARY KEY NOT 
ENFORCED", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream PARTITION BY 
id, "
+                                    + "op_mapping => MAP["
+                                    + "'INSERT', 'INSERT', "
+                                    + "'UPDATE_AFTER', 'UPDATE_AFTER', "
+                                    + "'DELETE', 'DELETE'])")
+                    .build();
+
+    public static final TableTestProgram UPSERT_PARTITION_BY_CUSTOM_MAPPING =

Review Comment:
   Agree with @fhueske 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to