JunboWang created FLINK-38197: --------------------------------- Summary: Data Errors Caused by Default Added Columns at End Position Key: FLINK-38197 URL: https://issues.apache.org/jira/browse/FLINK-38197 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.4.0 Reporter: JunboWang
{*}Problem Description{*}: When adding new columns through schema changes, flink cdc defaults to placing them at the end of the table, resulting in inconsistent column structures between the source table (e.g., MySQL) and sink table (e.g., Paimon). For example, if a new column `{{{}schema_test_1`{}}} is specified to be added after `{{{}schema_test`{}}} in the source table, Flink CDC processes it and automatically appends the column to the end of the sink table. After the task restarts with cleared state, mismatched column positions cause data misalignment (e.g., incorrect field value mapping), leading to data loss or task failure. {*}Root Cause{*}: There is a flaw in the schema change handling logic. The {{lenientizeAddColumnEvent}} method does not retain the column position information specified in the source table (e.g., {{AFTER}} a certain column) when processing add-column events. Instead, it forces the column position to {{LAST}} (end) through the default constructor of {{{}AddColumnEvent{}}}, preventing synchronization of column structures between the source and sink tables. {code:java} // code placeholder private static Stream<SchemaChangeEvent> lenientizeAddColumnEvent( AddColumnEvent schemaChangeEvent, TableId tableId) { return Stream.of( new AddColumnEvent( tableId, schemaChangeEvent.getAddedColumns().stream() .map( col -> new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( col.getAddColumn().getName(), col.getAddColumn() .getType() .nullable(), col.getAddColumn().getComment(), col.getAddColumn() .getDefaultValueExpression()))) .collect(Collectors.toList()))); } public ColumnWithPosition(Column addColumn) { this.addColumn = addColumn; position = ColumnPosition.LAST; existedColumnName = null; } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)