VinaySagarGonabavi commented on code in PR #4279:
URL: https://github.com/apache/flink-cdc/pull/4279#discussion_r2881294953


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -120,7 +120,16 @@ public static Schema applySchemaChangeEvent(Schema schema, 
SchemaChangeEvent eve
 
     private static Schema applyAddColumnEvent(AddColumnEvent event, Schema 
oldSchema) {
         LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
+        Set<String> existingColumnNames =
+                columns.stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toCollection(HashSet::new));
         for (AddColumnEvent.ColumnWithPosition columnWithPosition : 
event.getAddedColumns()) {
+            // Skip columns that already exist in the schema to handle 
duplicate AddColumnEvents
+            // (e.g., from gh-ost online schema migrations)
+            if 
(existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
+                continue;
+            }

Review Comment:
   Added LOG field to SchemaUtils. When skipping a duplicate column, compares 
DataType of existing vs incoming column and logs at WARN level if they differ



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to