kianwee created FLINK-39412:
-------------------------------

             Summary: AddColumnEvent fails with duplicate field names when 
schema change events are replayed after failover
                 Key: FLINK-39412
                 URL: https://issues.apache.org/jira/browse/FLINK-39412
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: kianwee


### Problem               
          
  When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog events 
may be replayed, causing \{{AddColumnEvent}} to be applied for columns that 
already exist in the cached schema. This leads to a \{{RowType}} validation 
failure:       
                            
  \{code:java}                                                                  
                                                                                
                                                                                
      
  
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException: 
Failed to pre-transform with
      AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon, 
addedColumns=[ColumnWithPosition{column=`valid_date` STRING, position=LAST, 
existedColumnName=null}]}                                                       
                                 
  ...                                                                           
                                                                                
                                                                                
     
  Caused by: java.lang.IllegalArgumentException: Field names must be unique. 
Found duplicates: [valid_date]                                                  
                                                                                
        
      at 
org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)      
                                                                                
                                                                            
      at 
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
  \{code}                                                                       
                                                                                
                                                                                
      
                                                                                
                      
### Root Cause                                                                  
                                                                                
                                                                                
   
                                                                                
                      
  \{{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking 
if a column with the same name already exists. While 
\{{isSchemaChangeEventRedundant()}} exists as a utility method, 
\{{PreTransformOperator.cacheChangeSchema()}} does  
  not call it before applying schema changes.
                                                                                
                                                                                
                                                                                
     
  This can be triggered when:                                                   
                      
  * A job restores from checkpoint/savepoint and the binlog offset rolls back, 
replaying a historical \{{ALTER TABLE ADD COLUMN}} DDL.
  * The snapshot phase captures a schema that already includes the column, but 
the binlog stream still contains the corresponding DDL event.                   
                                                                                
      
                                                                                
                                                                                
                                                                                
     
 ### Fix                                                                        
                                                                                
                                                                                
    
                                                                                
                                                                                
                                                                                
     
  Add an idempotency check in \{{SchemaUtils.applyAddColumnEvent()}} to skip 
columns whose name already exists in the current schema. This is the most 
defensive fix location since it protects all callers of 
\{{applySchemaChangeEvent()}}, not just 
  \{{PreTransformOperator}}.
                                                                                
                                                                                
                                                                                
     
  PR: https://github.com/apache/flink-cdc/pull/4370                             
                      
                            
  Priority: Major                                                               
                                                                                
                                                                                
     
                                    
  Type: Bug                                                                     
                              



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to