cheng qian created FLINK-38741:
----------------------------------
Summary: MODIFY COLUMN position change events are ignored causing
schema evolution failure
Key: FLINK-38741
URL: https://issues.apache.org/jira/browse/FLINK-38741
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.5.0
Environment: * {*}Flink CDC Version{*}: 3.5.0
* {*}Flink Version{*}: 1.19.1
* {*}Source Database{*}: MySQL 8.0
* {*}Sink{*}: Apache Paimon
* {*}Deployment Mode{*}: Pipeline
Reporter: cheng qian
h3. Summary
When using MySQL CDC Pipeline to synchronize data to Paimon, {{ALTER TABLE
MODIFY COLUMN}} operations that change column order are *ignored* and not
propagated to the downstream sink, causing schema evolution failure and
potential data inconsistency.
h3. Problem Statement
When executing DDL statements like {{ALTER TABLE MODIFY COLUMN col2
VARCHAR(100) AFTER col3}} to change column order in MySQL, the schema change
event is:
# *Filtered out* by schema evolution framework, OR
# *Not processed* by Paimon MetadataApplier
This causes:
* Schema inconsistency between source and sink
* Schema evolution failure
* Potential data corruption after job restart
h2. Steps to Reproduce
h3. 1. Environment Setup
{*}MySQL Table Structure{*}:
{code:java}
CREATE TABLE test_column_order (
id INT PRIMARY KEY,
col1 int,
col2 VARCHAR(50),
col3 VARCHAR(50)
);INSERT INTO test_column_order VALUES
(1, 1, 'b1', 'c1'),
(2, 2, 'b2', 'c2'),
(3, 3, 'b3', 'c3'); {code}
*Flink CDC Pipeline Configuration* ({{{}mysql-to-paimon.yaml{}}}):
{code:java}
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: password
database-name: test_db
table-name: test_column_order
server-id: 5400-5404
sink:
type: paimon
catalog-type: filesystem
warehouse: /tmp/paimon
pipeline:
name: MySQL to Paimon Pipeline
parallelism: 1
route:
- source-table: test_db.test_column_order
sink-table: paimon_db.test_column_order {code}
2. Start Pipeline
{code:java}
./bin/flink-cdc.sh mysql-to-paimon.yaml {code}
3. Verify Initial Synchronization
{code:java}
-- Query in Paimon
SELECT * FROM paimon_db.test_column_order; {code}
Expected column order: id, col1, col2, col3
h3. 4. Execute Column Order Change
{code:java}
-- Execute in MySQL
ALTER TABLE test_column_order
MODIFY COLUMN col2 VARCHAR(50) AFTER col3;
-- Verify column order in MySQL
DESC test_column_order; {code}
MySQL column order becomes: id, col1, col3, col2 ✅
h3. 5. Check Paimon Table Structure
{code:java}
-- Query table structure in Paimon
DESC paimon_db.test_column_order; {code}
Expected Result: Column order should be id, col1, col3, col2 Actual Result:
Column order remains id, col1, col2, col3 ❌
h3. 6. Insert New Data for Verification
{code:java}
-- Insert new data in MySQL
INSERT INTO test_column_order VALUES (4, 4, 'c4', 'b4');
-- Note: Column order is now id, col1, col3, col2 {code}
{*}Expected{*}: Data in Paimon should be {{(4, 4, 'c4', 'b4')}}{*}Actual{*}:
May encounter data misalignment or synchronization failure
h2. Expected Behavior
# MySQL CDC should capture {{MODIFY COLUMN}} position change events
# Schema evolution framework should handle column reordering events
# Paimon MetadataApplier should execute column reordering operations
# Sink table column order should remain consistent with source table
h2. Actual Behavior
# {{MODIFY COLUMN}} position change events are ignored
# Sink table column order remains unchanged
# No error logs or warning messages
# Schema evolution fails silently
h2. Root Cause Analysis
h3. Possible Causes
h4. 1. MySQL CDC Connector Not Capturing Column Position Information
At the {{MySqlStreamingChangeEventSource}} or Debezium level, only column type
changes may be captured while position changes are ignored.
h4. 2. Schema Evolution Framework Filtering "Position-Only" Events
In {{{}SchemaChangeEventVisitor{}}}, "position-only changes" might be
considered invalid schema changes and filtered out.
h4. 3. Paimon MetadataApplier Not Supporting Column Reordering
{{PaimonMetadataApplier}} may not have implemented column reordering logic.
h3. Related Code Locations
# When executing ALTER TABLE MODIFY/CHANGE COLUMN statements, the CDC program
captures AlterColumnTypeEvent events, but these events only contain the
modified column type information without column position information.
{code:java}
//
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaChangeEventVisitor.java
public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema,
SchemaChangeEvent {
private static final long serialVersionUID = 1L;
private final TableId tableId;
/** key => column name, value => column type after changing. */
private final Map<String, DataType> typeMapping;
private final Map<String, DataType> oldTypeMapping;
...
}{code}
# AlterColumnTypeEvent does not change the column type during modification, so
the validation here discards it, and therefore this event is not propagated to
downstream.
{code:java}
//flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
public static boolean isSchemaChangeEventRedundant(
@Nullable Schema currentSchema, SchemaChangeEvent event) {
Optional<Schema> latestSchema = Optional.ofNullable(currentSchema);
return Boolean.TRUE.equals(
SchemaChangeEventVisitor.visit(
event,
addColumnEvent -> {
//
},
alterColumnTypeEvent -> {
// It has not been applied if schema does not even
exist
if (!latestSchema.isPresent()) {
return false;
}
Schema schema = latestSchema.get();
// It has been applied only if all column types are
set as expected
for (Map.Entry<String, DataType> entry :
alterColumnTypeEvent.getTypeMapping().entrySet()) {
if
(!schema.getColumn(entry.getKey()).isPresent()
|| !schema.getColumn(entry.getKey())
.get()
.getType()
.equals(entry.getValue())) {
return false;
}
}
return true;
}
//
}
{code}
h2. Impact
h3. Severity: Critical
# {*}Data Consistency Issues{*}: Schema inconsistency between source and sink
tables
# {*}Data Misalignment Risk{*}: May cause data to be written to wrong columns
after restart
# {*}Production Environment Risk{*}: Affects all users using Pipeline mode
# {*}Silent Failure{*}: No error messages, making the issue hard to detect
{{}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)