[ 
https://issues.apache.org/jira/browse/FLINK-37334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MOBIN updated FLINK-37334:
--------------------------
    Description: 
Repeat step:
{color:#800000}{color}
{code:java}
ALTER TABLE cdc_sink19
add COLUMN a1_after1 bigint after a1 {code}
log: [add column after event becomes add column last]
2025-02-17 14:49:22,184 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 1 - Start deducing evolved schema change for 
AddColumnEvent{tableId=dw_app.cdc_sink19, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#ff0000}AFTER{color}, existedColumnName=a1}]}
2025-02-17 14:49:22,184 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 2 - Affected downstream tables are: [rt_ods.cdc_sink19_add_column]
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 3.1 - For to-be-evolved table rt_ods.cdc_sink19_add_column with schema 
columns=\{`id` DECIMAL(20, 0) NOT NULL '主键ID',`a1` VARCHAR(50),`a1_after` 
BIGINT,`ab2` BIGINT,`table_name` STRING NOT NULL}, primaryKeys=id, options=()...
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 3.2 - upstream dependency tables are: [dw_app.cdc_sink19]
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 3.3 - It's an one-by-one routing and could be forwarded as 
AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#ff0000}AFTER{color}, existedColumnName=a1}]}.
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 4 - After being normalized with LENIENT behavior, final schema change 
events are: [AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#ff0000}LAST{color}, existedColumnName=null}]}]
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
All sink subtask have flushed for table dw_app.cdc_sink19. Start to apply 
schema change request:
        SchemaChangeRequest{tableId=dw_app.cdc_sink19, 
schemaChangeEvent=AddColumnEvent{tableId=dw_app.cdc_sink19, 
addedColumns=[ColumnWithPosition

{column=`a1_after1` BIGINT, position=AFTER, existedColumnName=a1}

]}, subTaskId=0}
that extracts to:
        AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#ff0000}LAST{color}, existedColumnName=null}]}
2025-02-17 14:49:22,307 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Successfully applied schema change event 
AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#ff0000}LAST{color}, existedColumnName=null}]} to external 
system.

 

  was:
Repeat step:
{color:#800000}ALTER{color} {color:#800000}TABLE{color} 
{color:#8e00c6}cdc_sink19{color}

{color:#800000}add{color} {color:#800000}COLUMN{color} a1_after1 
{color:#000080}bigint{color} {color:#800000}after{color} 
{color:#000000}a1{color}



log: [add column after event becomes add column last]
2025-02-17 14:49:22,184 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 1 - Start deducing evolved schema change for 
AddColumnEvent{tableId=dw_app.cdc_sink19, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#FF0000}AFTER{color}, existedColumnName=a1}]}
2025-02-17 14:49:22,184 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 2 - Affected downstream tables are: [rt_ods.cdc_sink19_add_column]
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 3.1 - For to-be-evolved table rt_ods.cdc_sink19_add_column with schema 
columns=\{`id` DECIMAL(20, 0) NOT NULL '主键ID',`a1` VARCHAR(50),`a1_after` 
BIGINT,`ab2` BIGINT,`table_name` STRING NOT NULL}, primaryKeys=id, options=()...
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 3.2 - upstream dependency tables are: [dw_app.cdc_sink19]
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 3.3 - It's an one-by-one routing and could be forwarded as 
AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#FF0000}AFTER{color}, existedColumnName=a1}]}.
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Step 4 - After being normalized with LENIENT behavior, final schema change 
events are: [AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#FF0000}LAST{color}, existedColumnName=null}]}]
2025-02-17 14:49:22,185 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
All sink subtask have flushed for table dw_app.cdc_sink19. Start to apply 
schema change request:
        SchemaChangeRequest\{tableId=dw_app.cdc_sink19, 
schemaChangeEvent=AddColumnEvent{tableId=dw_app.cdc_sink19, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, position=AFTER, 
existedColumnName=a1}]}, subTaskId=0}
that extracts to:
        AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#FF0000}LAST{color}, existedColumnName=null}]}
2025-02-17 14:49:22,307 INFO  
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
Successfully applied schema change event 
AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
position={color:#FF0000}LAST{color}, existedColumnName=null}]} to external 
system.

 


> Fix mysql's add column after event becomes add column last
> ----------------------------------------------------------
>
>                 Key: FLINK-37334
>                 URL: https://issues.apache.org/jira/browse/FLINK-37334
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.3.0
>            Reporter: MOBIN
>            Priority: Major
>
> Repeat step:
> {color:#800000}{color}
> {code:java}
> ALTER TABLE cdc_sink19
> add COLUMN a1_after1 bigint after a1 {code}
> log: [add column after event becomes add column last]
> 2025-02-17 14:49:22,184 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> Step 1 - Start deducing evolved schema change for 
> AddColumnEvent{tableId=dw_app.cdc_sink19, 
> addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
> position={color:#ff0000}AFTER{color}, existedColumnName=a1}]}
> 2025-02-17 14:49:22,184 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> Step 2 - Affected downstream tables are: [rt_ods.cdc_sink19_add_column]
> 2025-02-17 14:49:22,185 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> Step 3.1 - For to-be-evolved table rt_ods.cdc_sink19_add_column with schema 
> columns=\{`id` DECIMAL(20, 0) NOT NULL '主键ID',`a1` VARCHAR(50),`a1_after` 
> BIGINT,`ab2` BIGINT,`table_name` STRING NOT NULL}, primaryKeys=id, 
> options=()...
> 2025-02-17 14:49:22,185 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> Step 3.2 - upstream dependency tables are: [dw_app.cdc_sink19]
> 2025-02-17 14:49:22,185 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> Step 3.3 - It's an one-by-one routing and could be forwarded as 
> AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
> addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
> position={color:#ff0000}AFTER{color}, existedColumnName=a1}]}.
> 2025-02-17 14:49:22,185 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> Step 4 - After being normalized with LENIENT behavior, final schema change 
> events are: [AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
> addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
> position={color:#ff0000}LAST{color}, existedColumnName=null}]}]
> 2025-02-17 14:49:22,185 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> All sink subtask have flushed for table dw_app.cdc_sink19. Start to apply 
> schema change request:
>         SchemaChangeRequest{tableId=dw_app.cdc_sink19, 
> schemaChangeEvent=AddColumnEvent{tableId=dw_app.cdc_sink19, 
> addedColumns=[ColumnWithPosition
> {column=`a1_after1` BIGINT, position=AFTER, existedColumnName=a1}
> ]}, subTaskId=0}
> that extracts to:
>         AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
> addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
> position={color:#ff0000}LAST{color}, existedColumnName=null}]}
> 2025-02-17 14:49:22,307 INFO  
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator [] - 
> Successfully applied schema change event 
> AddColumnEvent{tableId=rt_ods.cdc_sink19_add_column, 
> addedColumns=[ColumnWithPosition{column=`a1_after1` BIGINT, 
> position={color:#ff0000}LAST{color}, existedColumnName=null}]} to external 
> system.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to