JunboWang created FLINK-38197:
---------------------------------

             Summary: Data Errors Caused by Default Added Columns at End 
Position
                 Key: FLINK-38197
                 URL: https://issues.apache.org/jira/browse/FLINK-38197
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
            Reporter: JunboWang


{*}Problem Description{*}:
When adding new columns through schema changes, flink cdc defaults to placing 
them at the end of the table, resulting in inconsistent column structures 
between the source table (e.g., MySQL) and sink table (e.g., Paimon). For 
example, if a new column `{{{}schema_test_1`{}}} is specified to be added after 
`{{{}schema_test`{}}} in the source table, Flink CDC processes it and 
automatically appends the column to the end of the sink table. After the task 
restarts with cleared state, mismatched column positions cause data 
misalignment (e.g., incorrect field value mapping), leading to data loss or 
task failure.
 
{*}Root Cause{*}:
There is a flaw in the schema change handling logic. The 
{{lenientizeAddColumnEvent}} method does not retain the column position 
information specified in the source table (e.g., {{AFTER}} a certain column) 
when processing add-column events. Instead, it forces the column position to 
{{LAST}} (end) through the default constructor of {{{}AddColumnEvent{}}}, 
preventing synchronization of column structures between the source and sink 
tables.
 
{code:java}
// code placeholder
private static Stream<SchemaChangeEvent> lenientizeAddColumnEvent(
        AddColumnEvent schemaChangeEvent, TableId tableId) {
    return Stream.of(
            new AddColumnEvent(
                    tableId,
                    schemaChangeEvent.getAddedColumns().stream()
                            .map(
                                    col ->
                                            new 
AddColumnEvent.ColumnWithPosition(
                                                    Column.physicalColumn(
                                                            
col.getAddColumn().getName(),
                                                            col.getAddColumn()
                                                                    .getType()
                                                                    .nullable(),
                                                            
col.getAddColumn().getComment(),
                                                            col.getAddColumn()
                                                                    
.getDefaultValueExpression())))
                            .collect(Collectors.toList())));
}

public ColumnWithPosition(Column addColumn) {
    this.addColumn = addColumn;
    position = ColumnPosition.LAST;
    existedColumnName = null;
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to