VinaySagarGonabavi commented on code in PR #4279:
URL: https://github.com/apache/flink-cdc/pull/4279#discussion_r2881294953
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -120,7 +120,16 @@ public static Schema applySchemaChangeEvent(Schema schema,
SchemaChangeEvent eve
private static Schema applyAddColumnEvent(AddColumnEvent event, Schema
oldSchema) {
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
+ Set<String> existingColumnNames =
+ columns.stream()
+ .map(Column::getName)
+ .collect(Collectors.toCollection(HashSet::new));
for (AddColumnEvent.ColumnWithPosition columnWithPosition :
event.getAddedColumns()) {
+ // Skip columns that already exist in the schema to handle
duplicate AddColumnEvents
+ // (e.g., from gh-ost online schema migrations)
+ if
(existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
+ continue;
+ }
Review Comment:
Added LOG field to SchemaUtils. When skipping a duplicate column, compares
DataType of existing vs incoming column and logs at WARN level if they differ
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]