JunboWang created FLINK-38197:
---------------------------------
Summary: Data Errors Caused by Default Added Columns at End
Position
Key: FLINK-38197
URL: https://issues.apache.org/jira/browse/FLINK-38197
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.4.0
Reporter: JunboWang
{*}Problem Description{*}:
When adding new columns through schema changes, flink cdc defaults to placing
them at the end of the table, resulting in inconsistent column structures
between the source table (e.g., MySQL) and sink table (e.g., Paimon). For
example, if a new column `{{{}schema_test_1`{}}} is specified to be added after
`{{{}schema_test`{}}} in the source table, Flink CDC processes it and
automatically appends the column to the end of the sink table. After the task
restarts with cleared state, mismatched column positions cause data
misalignment (e.g., incorrect field value mapping), leading to data loss or
task failure.
{*}Root Cause{*}:
There is a flaw in the schema change handling logic. The
{{lenientizeAddColumnEvent}} method does not retain the column position
information specified in the source table (e.g., {{AFTER}} a certain column)
when processing add-column events. Instead, it forces the column position to
{{LAST}} (end) through the default constructor of {{{}AddColumnEvent{}}},
preventing synchronization of column structures between the source and sink
tables.
{code:java}
// code placeholder
private static Stream<SchemaChangeEvent> lenientizeAddColumnEvent(
AddColumnEvent schemaChangeEvent, TableId tableId) {
return Stream.of(
new AddColumnEvent(
tableId,
schemaChangeEvent.getAddedColumns().stream()
.map(
col ->
new
AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
col.getAddColumn().getName(),
col.getAddColumn()
.getType()
.nullable(),
col.getAddColumn().getComment(),
col.getAddColumn()
.getDefaultValueExpression())))
.collect(Collectors.toList())));
}
public ColumnWithPosition(Column addColumn) {
this.addColumn = addColumn;
position = ColumnPosition.LAST;
existedColumnName = null;
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)