Copilot commented on code in PR #4279:
URL: https://github.com/apache/flink-cdc/pull/4279#discussion_r2867220304


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -215,6 +220,24 @@ private CreateTableEvent cacheCreateTable(CreateTableEvent 
event) {
     private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent 
event) {
         TableId tableId = event.tableId();
         PreTransformChangeInfo tableChangeInfo = 
preTransformChangeInfoMap.get(tableId);
+
+        // Filter out redundant AddColumnEvent columns that already exist in 
the schema
+        // to handle duplicate events from tools like gh-ost online schema 
migrations
+        if (event instanceof AddColumnEvent) {
+            AddColumnEvent addColumnEvent = (AddColumnEvent) event;
+            Schema currentSchema = tableChangeInfo.getSourceSchema();
+            Optional<AddColumnEvent> filtered =
+                    SchemaUtils.filterRedundantAddColumns(currentSchema, 
addColumnEvent);
+            if (!filtered.isPresent()) {
+                LOG.debug(
+                        "Skipping fully redundant AddColumnEvent for table {} "
+                                + "- all columns already exist",
+                        tableId);
+                return Optional.empty();
+            }
+            event = filtered.get();
+        }

Review Comment:
   In `processEvent`, `preTransformProcessorMap.remove(tableId)` is executed 
for every `SchemaChangeEvent`. With the new early-return here, a 
fully-redundant `AddColumnEvent` will return `Optional.empty()` before 
`cachePreTransformProcessor(...)` is called, so the processor map entry is 
never rebuilt. This can cause subsequent `DataChangeEvent`s for the table to 
fail the `processor != null` check and crash the pipeline. Ensure the processor 
is re-cached even when the AddColumnEvent is filtered to a no-op (or avoid 
removing the processor map entry in the redundant case).



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java:
##########
@@ -840,6 +840,245 @@ void testSchemaChangeWithPostWildcard() throws Exception {
                 .runTests("inserting columns at last");
     }
 
+    /**
+     * This case tests that duplicate AddColumnEvents are handled gracefully 
by both
+     * PreTransformOperator and PostTransformOperator. When the same column is 
added twice (e.g.,
+     * from gh-ost online schema migrations), the second event should be 
filtered out.
+     */
+    @Test
+    void testDuplicateAddColumnEventPreTransform() throws Exception {
+        TableId tableId = TableId.tableId("my_company", "my_branch", 
"data_changes");
+        TransformWithSchemaEvolveTestCase.of(
+                        tableId,
+                        "*, id + age as computed",
+                        "name <> 'Alice'",
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("name", DataTypes.STRING())
+                                .physicalColumn("age", DataTypes.INT())
+                                .primaryKey("id")
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("name", DataTypes.STRING())
+                                .physicalColumn("age", DataTypes.INT())
+                                .primaryKey("id")
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
+                                .physicalColumn("name", DataTypes.STRING())
+                                .physicalColumn("age", DataTypes.INT())
+                                .physicalColumn("computed", DataTypes.INT())
+                                .primaryKey("id")
+                                .build())
+                .initializeHarness()
+                .runTests("initializing table")
+                // First AddColumnEvent: add "extras" column
+                .evolveFromSource(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.LAST,
+                                                null))))
+                .expectInPreTransformed(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.AFTER,
+                                                "age"))))
+                .expectInPostTransformed(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.AFTER,
+                                                "age"))))
+                .runTests("adding extras column first time")
+                // Duplicate AddColumnEvent: add "extras" column again (should 
be filtered)
+                .evolveFromSource(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("extras", DataTypes.FLOAT()),
+                                                
AddColumnEvent.ColumnPosition.LAST,
+                                                null))))
+                .expectNothingInPreTransformed()
+                .expectNothingInPostTransformed()
+                .runTests("duplicate add extras column should be filtered")

Review Comment:
   These duplicate-AddColumnEvent tests stop immediately after asserting the 
redundant event is filtered. To prevent regressions like losing internal 
transform processors/state, it would be useful to also process at least one 
subsequent `DataChangeEvent` after the duplicate schema event and assert the 
pipeline continues to transform records correctly (i.e., no operator crash / 
missing schema view).
   ```suggestion
                   .runTests("duplicate add extras column should be filtered")
                   // After the duplicate AddColumnEvent has been filtered, 
process a subsequent
                   // DataChangeEvent to verify that the pipeline continues to 
transform records
                   // correctly (i.e., no operator crash or missing schema 
view).
                   // The concrete DataChangeEvent and its expected transformed 
output should follow
                   // the same construction patterns as other tests in this 
class.
                   // For example (pseudocode, to be aligned with existing 
helpers in this test):
                   // .evolveFromSource(
                   //         someInsertOrUpdateDataChangeEventFor(tableId, id, 
name, age, extras))
                   // .expectInPreTransformed(expectedPreTransformEvent)
                   // .expectInPostTransformed(expectedPostTransformEvent)
                   // .runTests("data change after duplicate add extras column 
is still processed")
   ```



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -120,7 +120,16 @@ public static Schema applySchemaChangeEvent(Schema schema, 
SchemaChangeEvent eve
 
     private static Schema applyAddColumnEvent(AddColumnEvent event, Schema 
oldSchema) {
         LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
+        Set<String> existingColumnNames =
+                columns.stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toCollection(HashSet::new));
         for (AddColumnEvent.ColumnWithPosition columnWithPosition : 
event.getAddedColumns()) {
+            // Skip columns that already exist in the schema to handle 
duplicate AddColumnEvents
+            // (e.g., from gh-ost online schema migrations)
+            if 
(existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
+                continue;
+            }

Review Comment:
   `applyAddColumnEvent` now silently skips adding a column when the name 
already exists. This can mask upstream inconsistencies (e.g., same column name 
but different type/comment/default), leaving the schema potentially out of sync 
with the source without any signal. Consider validating that the existing 
column definition matches the incoming `addColumn` (and throw or at least 
log/warn when it differs) so only true duplicates are treated as idempotent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to