VinaySagarGonabavi commented on code in PR #4279:
URL: https://github.com/apache/flink-cdc/pull/4279#discussion_r2881291112


##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java:
##########
@@ -840,6 +840,245 @@ void testSchemaChangeWithPostWildcard() throws Exception {
                 .runTests("inserting columns at last");
     }
 
+    /**
+     * This case tests that duplicate AddColumnEvents are handled gracefully 
by both
+     * PreTransformOperator and PostTransformOperator. When the same column is 
added twice (e.g.,
+     * from gh-ost online schema migrations), the second event should be 
filtered out.
+     */
+    @Test
+    void testDuplicateAddColumnEventPreTransform() throws Exception {
+        TableId tableId = TableId.tableId("my_company", "my_branch", 
"data_changes");
+        TransformWithSchemaEvolveTestCase.of(
+                        tableId,
+                        "*, id + age as computed",
+                        "name <> 'Alice'",
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("name", DataTypes.STRING())
+                                .physicalColumn("age", DataTypes.INT())
+                                .primaryKey("id")
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("name", DataTypes.STRING())
+                                .physicalColumn("age", DataTypes.INT())
+                                .primaryKey("id")
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
+                                .physicalColumn("name", DataTypes.STRING())
+                                .physicalColumn("age", DataTypes.INT())
+                                .physicalColumn("computed", DataTypes.INT())
+                                .primaryKey("id")
+                                .build())
+                .initializeHarness()
+                .runTests("initializing table")
+                // First AddColumnEvent: add "extras" column
+                .evolveFromSource(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.LAST,
+                                                null))))
+                .expectInPreTransformed(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.AFTER,
+                                                "age"))))
+                .expectInPostTransformed(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.AFTER,
+                                                "age"))))
+                .runTests("adding extras column first time")
+                // Duplicate AddColumnEvent: add "extras" column again (should 
be filtered)
+                .evolveFromSource(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.LAST,
+                                                null))))
+                .expectNothingInPreTransformed()
+                .expectNothingInPostTransformed()
+                .runTests("duplicate add extras column should be filtered")

Review Comment:
   In the new commit, both testDuplicateAddColumnEventPreTransform() and 
testDuplicateAddColumnEventPostTransform() send a DataChangeEvent after the 
duplicate AddColumnEvent and assert it's processed correctly through the 
transform pipeline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to