twalthr commented on code in PR #26880:
URL: https://github.com/apache/flink/pull/26880#discussion_r2262588859
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java:
##########
@@ -907,4 +907,65 @@ public class MultiJoinTestPrograms {
+ "INNER JOIN OrdersWithRowtime o
ON u.user_id_0 = o.user_id_1 "
+ "INNER JOIN Payments p ON
u.user_id_0 = p.user_id_2")
.build();
+
+ public static final TableTestProgram MULTI_JOIN_MIXED_CHANGELOG_MODES =
+ TableTestProgram.of(
+ "three-way-mixed-changelog-modes",
+ "three way join with mixed changelog modes and
primary key configurations")
+
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
+ .setupTableSource(
+ SourceTestStep.newBuilder("AppendTable")
+ .addSchema("id STRING PRIMARY KEY NOT
ENFORCED, val STRING")
+ .addOption("changelog-mode", "I")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, "1",
"append1"),
+ Row.ofKind(RowKind.INSERT, "2",
"append2"),
+ Row.ofKind(RowKind.INSERT, "3",
"append3"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("RetractTable")
+ .addSchema("ref_id STRING, data STRING")
+ .addOption("changelog-mode", "I,UA,UB,D")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, "1",
"retract1"),
+ Row.ofKind(RowKind.INSERT, "2",
"retract2"),
+ Row.ofKind(RowKind.INSERT, "3",
"retract3"),
+ Row.ofKind(RowKind.DELETE, "3",
"retract3"),
+ Row.ofKind(RowKind.INSERT, "1",
"retract1_new"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("UpsertTable")
+ .addSchema(
+ "key_id STRING PRIMARY KEY NOT
ENFORCED, status STRING")
+ .addOption("changelog-mode", "I,UA,D")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, "1",
"active"),
+ Row.ofKind(RowKind.INSERT, "2",
"pending"),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"2", "pending"),
Review Comment:
Upsert should not contain an update before.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java:
##########
@@ -907,4 +907,65 @@ public class MultiJoinTestPrograms {
+ "INNER JOIN OrdersWithRowtime o
ON u.user_id_0 = o.user_id_1 "
+ "INNER JOIN Payments p ON
u.user_id_0 = p.user_id_2")
.build();
+
+ public static final TableTestProgram MULTI_JOIN_MIXED_CHANGELOG_MODES =
+ TableTestProgram.of(
+ "three-way-mixed-changelog-modes",
+ "three way join with mixed changelog modes and
primary key configurations")
+
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
+ .setupTableSource(
+ SourceTestStep.newBuilder("AppendTable")
+ .addSchema("id STRING PRIMARY KEY NOT
ENFORCED, val STRING")
+ .addOption("changelog-mode", "I")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, "1",
"append1"),
+ Row.ofKind(RowKind.INSERT, "2",
"append2"),
+ Row.ofKind(RowKind.INSERT, "3",
"append3"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("RetractTable")
+ .addSchema("ref_id STRING, data STRING")
+ .addOption("changelog-mode", "I,UA,UB,D")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, "1",
"retract1"),
+ Row.ofKind(RowKind.INSERT, "2",
"retract2"),
+ Row.ofKind(RowKind.INSERT, "3",
"retract3"),
+ Row.ofKind(RowKind.DELETE, "3",
"retract3"),
+ Row.ofKind(RowKind.INSERT, "1",
"retract1_new"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("UpsertTable")
+ .addSchema(
+ "key_id STRING PRIMARY KEY NOT
ENFORCED, status STRING")
+ .addOption("changelog-mode", "I,UA,D")
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, "1",
"active"),
+ Row.ofKind(RowKind.INSERT, "2",
"pending"),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"2", "pending"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"2", "active"),
+ Row.ofKind(RowKind.INSERT, "3",
"inactive"),
+ Row.ofKind(RowKind.DELETE, "3",
"inactive"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(
+ "id STRING",
+ "val STRING",
+ "data STRING",
+ "status STRING")
+ .addOption("changelog-mode", "I,UA,UB,D")
Review Comment:
sinks use a different flag, otherwise the mode is not considered:
`sink-changelog-mode-enforced`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]