kianwee created FLINK-39412:
-------------------------------
Summary: AddColumnEvent fails with duplicate field names when
schema change events are replayed after failover
Key: FLINK-39412
URL: https://issues.apache.org/jira/browse/FLINK-39412
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: kianwee
### Problem
When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog events
may be replayed, causing \{{AddColumnEvent}} to be applied for columns that
already exist in the cached schema. This leads to a \{{RowType}} validation
failure:
\{code:java}
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException:
Failed to pre-transform with
AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon,
addedColumns=[ColumnWithPosition{column=`valid_date` STRING, position=LAST,
existedColumnName=null}]}
...
Caused by: java.lang.IllegalArgumentException: Field names must be unique.
Found duplicates: [valid_date]
at
org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)
at
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
\{code}
### Root Cause
\{{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking
if a column with the same name already exists. While
\{{isSchemaChangeEventRedundant()}} exists as a utility method,
\{{PreTransformOperator.cacheChangeSchema()}} does
not call it before applying schema changes.
This can be triggered when:
* A job restores from checkpoint/savepoint and the binlog offset rolls back,
replaying a historical \{{ALTER TABLE ADD COLUMN}} DDL.
* The snapshot phase captures a schema that already includes the column, but
the binlog stream still contains the corresponding DDL event.
### Fix
Add an idempotency check in \{{SchemaUtils.applyAddColumnEvent()}} to skip
columns whose name already exists in the current schema. This is the most
defensive fix location since it protects all callers of
\{{applySchemaChangeEvent()}}, not just
\{{PreTransformOperator}}.
PR: https://github.com/apache/flink-cdc/pull/4370
Priority: Major
Type: Bug
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
