CloverDew opened a new issue, #10647:
URL: https://github.com/apache/seatunnel/issues/10647

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   **What happened:** 
   When running a Flink job with MySQL CDC source and JDBC Exactly-Once sink 
connector with schema evolution enabled, the job hangs indefinitely at certain 
checkpoints during DDL operations. The issue occurs specifically when DDL 
events (ALTER TABLE) are processed concurrently with XA transactions used by 
the exactly-once sink.
   ```
   2026-03-24 11:43:03,144 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 7 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774345383143 for job 
4dabfd84185b4f842ef1738f89d84465.
   # Job hangs here indefinitely, no further checkpoint progress
   ```
   
   **Root Cause:** 
   The hang is caused by a self-deadlock in 
JdbcExactlyOnceSinkWriter.reOpenOutputFormat() method:
   
   When a DDL event occurs, reOpenOutputFormat() calls this.prepareCommit() 
first, which puts an XA transaction into PREPARED state
   The PREPARED XA transaction holds MySQL Metadata Locks (MDL) on the table
   Then reOpenOutputFormat() tries to execute ALTER TABLE on the same connection
   The ALTER TABLE gets blocked by the MDL locks held by the prepared XA 
transaction on the same connection
   This creates a self-deadlock situation where the connection waits for itself
   
   The issue occurs in translation layer when SchemaOperator forwards DDL 
events to JdbcExactlyOnceSinkWriter
   MySQL's XA transactions acquire MDL locks that conflict with DDL operations
   The prepared XA transaction from previous checkpoint blocks the current DDL 
execution
   
   
   ### SeaTunnel Version
   
   All of them have this problem.
   
   ### SeaTunnel Config
   
   ```conf
   source {
     MySQL-CDC {
       server-id = 5652-5657
       username = "mysqluser"
       password = "mysqlpw"
       table-names = ["shop.products"]
       base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
     }
   }
   
   sink {
     Jdbc {
       url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
       driver = "com.mysql.cj.jdbc.Driver"
       user = "mysqluser"
       password = "mysqlpw"
       generate_sink_sql = true
       database = "shop"
       table = "mysql_cdc_e2e_sink_table_with_schema_change_exactly_once"
       support_upsert_by_query_primary_key_exist = true
       is_exactly_once = true
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   -------
   ```
   
   ### Error Exception
   
   ```log
   **Error Logs:**
   
   2026-03-23 21:06:13,458 INFO  
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Connected to 
MySQL binlog at localhost:3306, starting at MySqlOffsetContext 
[sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, 
sourceInfo=SourceInfo [currentGtid=null, 
currentBinlogFilename=DESKTOP-HNU0D1U-bin.000008, currentBinlogPosition=6577, 
currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, 
currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, 
transactionContext=TransactionContext [currentTransactionId=null, 
perTableEventCount={}, totalEventCount=0], restartGtidSet=null, 
currentGtidSet=null, restartBinlogFilename=DESKTOP-HNU0D1U-bin.000008, 
restartBinlogPosition=6577, restartRowsToSkip=0, restartEventsToSkip=0, 
currentEventLengthInBytes=0, inTransaction=false, transactionId=null, 
incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, 
chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, 
maxim
 umKey=null]]
   2026-03-23 21:06:13,458 INFO  
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Waiting for 
keepalive thread to start
   2026-03-23 21:06:13,459 INFO  io.debezium.util.Threads - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
   2026-03-23 21:06:13,499 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement
 - PrepareStatement sql is:
   INSERT INTO 
`shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` (`id`, 
`name`, `description`, `weight`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE 
`id`=VALUES(`id`), `name`=VALUES(`name`), `description`=VALUES(`description`), 
`weight`=VALUES(`weight`)
   
   2026-03-23 21:06:13,503 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement
 - PrepareStatement sql is:
   DELETE FROM 
`shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` WHERE `id` = ?
   
   2026-03-23 21:06:13,562 INFO  
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Keepalive thread 
is running
   2026-03-23 21:06:13,650 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271173638 for job 
b1fbcd22a40d4a881708ad26b7a86a1b.
   2026-03-23 21:06:13,710 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 for job b1fbcd22a40d4a881708ad26b7a86a1b (16073 bytes, 
checkpointDuration=68 ms, finalizationTime=4 ms).
   2026-03-23 21:06:13,712 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking 
checkpoint 1 as completed for source Source: MySQL-CDC-Source.
   2026-03-23 21:06:18,638 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271178637 for job 
b1fbcd22a40d4a881708ad26b7a86a1b.
   2026-03-23 21:06:18,648 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 2 for job b1fbcd22a40d4a881708ad26b7a86a1b (17820 bytes, 
checkpointDuration=1 ms, finalizationTime=10 ms).
   2026-03-23 21:06:18,648 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking 
checkpoint 2 as completed for source Source: MySQL-CDC-Source.
   2026-03-23 21:06:18,670 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter 
- commit xid: 
[XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000007f93cd1a9d010000:74eef017,
 attempts=0)]
   2026-03-23 21:06:18,670 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - 
commit 1 transactions
   2026-03-23 21:06:18,670 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - 
committing 
201:3361643061393462656564343466333639393934613065323361323737333635000000007f93cd1a9d010000:74eef017
 transaction
   2026-03-23 21:06:23,644 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 3 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271183643 for job 
b1fbcd22a40d4a881708ad26b7a86a1b.
   2026-03-23 21:06:23,649 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 3 for job b1fbcd22a40d4a881708ad26b7a86a1b (17852 bytes, 
checkpointDuration=6 ms, finalizationTime=0 ms).
   2026-03-23 21:06:23,650 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking 
checkpoint 3 as completed for source Source: MySQL-CDC-Source.
   2026-03-23 21:06:23,650 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter 
- commit xid: 
[XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000008093cd1a9d010000:74eef017,
 attempts=0)]
   2026-03-23 21:06:23,650 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - 
commit 1 transactions
   2026-03-23 21:06:23,650 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - 
committing 
201:3361643061393462656564343466333639393934613065323361323737333635000000008093cd1a9d010000:74eef017
 transaction
   2026-03-23 21:06:28,636 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 4 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271188636 for job 
b1fbcd22a40d4a881708ad26b7a86a1b.
   2026-03-23 21:06:28,643 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 4 for job b1fbcd22a40d4a881708ad26b7a86a1b (17852 bytes, 
checkpointDuration=7 ms, finalizationTime=0 ms).
   2026-03-23 21:06:28,644 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking 
checkpoint 4 as completed for source Source: MySQL-CDC-Source.
   2026-03-23 21:06:28,645 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter 
- commit xid: 
[XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000008193cd1a9d010000:74eef017,
 attempts=0)]
   2026-03-23 21:06:28,645 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - 
commit 1 transactions
   2026-03-23 21:06:28,645 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - 
committing 
201:3361643061393462656564343466333639393934613065323361323737333635000000008193cd1a9d010000:74eef017
 transaction
   2026-03-23 21:06:28,921 WARN  
org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver - 
Ignoring statement for non-captured table ALTER TABLE products_on_hand ADD 
COLUMN add_column5 varchar(64) not null default 'yy'
   2026-03-23 21:06:30,779 INFO  
org.apache.seatunnel.api.event.LoggingEventHandler - log event: 
MessageDelayedEvent(createdTime=1774271190779, 
jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, 
delayTime=2372, 
record=SourceRecord{sourcePartition={server=mysql_binlog_source}, 
sourceOffset={transaction_id=null, ts_sec=1774271188, 
file=DESKTOP-HNU0D1U-bin.000008, pos=9657, server_id=1}} 
ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, 
key=Struct{databaseName=shop}, 
keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, 
value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188407,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-bin.000008,pos=9446,row=0},databaseName=shop,ddl=alter
 table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD 
COLUMN add_column2 int not null default 
1,tableChanges=[Struct{type=ALTER,id="shop"."products",table=Struct{defaultCharsetName=utf
 
8mb4,primaryKeyColumnNames=[id],columns=[Struct{name=id,jdbcType=4,typeName=INT,typeExpression=INT,position=1,optional=false,autoIncremented=true,generated=true},
 
Struct{name=name,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=255,position=2,optional=false,autoIncremented=false,generated=false},
 
Struct{name=description,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=512,position=3,optional=true,autoIncremented=false,generated=false},
 
Struct{name=weight,jdbcType=6,typeName=FLOAT,typeExpression=FLOAT,position=4,optional=true,autoIncremented=false,generated=false},
 
Struct{name=add_column1,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=64,position=5,optional=false,autoIncremented=false,generated=false},
 
Struct{name=add_column2,jdbcType=4,typeName=INT,typeExpression=INT,position=6,optional=false,autoIncremented=false,generated=false}]}}]},
 valueSchema=Schema{io.debezium.connector.mysql.SchemaCh
 angeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)})
   2026-03-23 21:06:30,779 INFO  
org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
        converter.type = value
        decimal.format = BASE64
        schemas.cache.size = 1000
        schemas.enable = true
   
   2026-03-23 21:06:30,797 WARN  
io.debezium.connector.mysql.MySqlValueConverters - Column is missing a 
character set: add_column1 VARCHAR(64) NOT NULL DEFAULT VALUE yy
   2026-03-23 21:06:30,797 WARN  
io.debezium.connector.mysql.MySqlValueConverters - Using UTF-8 charset by 
default for column without charset: add_column1 VARCHAR(64) NOT NULL DEFAULT 
VALUE yy
   2026-03-23 21:06:30,801 INFO  
org.apache.seatunnel.api.event.LoggingEventHandler - log event: 
AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190801,
 tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, 
statement=alter table products ADD COLUMN add_column1 varchar(64) not null 
default 'yy',ADD COLUMN add_column2 int not null default 1, 
sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, 
tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null), constraintKeys=[]), 
options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()})), 
events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190798,
 tableIdentifier=.shop.products, jobId=null, statement=null, 
sourceDialectName=MySQL, changeAfter=Catalog
 Table{tableId=MySQL.shop.products, 
tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null), constraintKeys=[]), 
options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()}))), 
column=PhysicalColumn(super=Column(name=add_column1, dataType=STRING, 
columnLength=256, scale=null, nullable=false, defaultValue=yy, comment=null, 
sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, 
isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, 
afterColumn=null), 
AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190798,
 tableIdentifier=.shop.products, jobId=null, statement=null, 
sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, 
tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null),
  constraintKeys=[]), options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()}))), 
column=PhysicalColumn(super=Column(name=add_column2, dataType=INT, 
columnLength=null, scale=null, nullable=false, defaultValue=1, comment=null, 
sourceType=INT, sinkType=null, options=null, isUnsigned=false, 
isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, 
afterColumn=null)])
   2026-03-23 21:06:30,919 INFO  
org.apache.seatunnel.translation.flink.schema.SchemaOperator - Starting schema 
change processing for table: .shop.products, job: 
b1fbcd22a40d4a881708ad26b7a86a1b, event time: 1774271190801
   2026-03-23 21:06:30,920 INFO  
org.apache.seatunnel.translation.flink.schema.SchemaOperator - Broadcasting 
SchemaChangeEvent to all downstream sink subtasks for table: .shop.products
   2026-03-23 21:06:30,921 INFO  
org.apache.seatunnel.translation.flink.schema.SchemaOperator - 
SchemaChangeEvent broadcast sent for table: .shop.products
   2026-03-23 21:06:30,921 INFO  
org.apache.seatunnel.translation.flink.schema.SchemaOperator - Synchronously 
processing schema change for table .shop.products (epoch 1774271190801). 
Business data buffered.
   2026-03-23 21:06:30,921 INFO  
org.apache.seatunnel.translation.flink.schema.coordinator.LocalSchemaCoordinator
 - Requesting schema change for table .shop.products (epoch 1774271190801). 
Waiting for all 1 sink subtasks to apply after checkpoint completion.
   2026-03-23 21:06:31,026 INFO  
org.apache.seatunnel.translation.flink.schema.BroadcastSchemaSinkOperator - 
Subtask 0 applying schema change immediately for table .shop.products (epoch 
1774271190801, change: AlterTableColumnsEvent). This prevents deadlock by 
allowing checkpoint barriers to propagate.
   2026-03-23 21:06:31,035 INFO  
org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter - FlinkSinkWriter 
applying SchemaChangeEvent for table: .shop.products
   2026-03-23 21:06:31,035 INFO  
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter - Start apply 
schema change for table shop.products sub-writer 0
   2026-03-23 21:06:31,055 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect - 
Executing add column SQL: ALTER TABLE 
`shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` ADD COLUMN 
`add_column1` VARCHAR(64) NOT NULL DEFAULT 'yy'
   2026-03-23 21:06:31,808 INFO  
org.apache.seatunnel.api.event.LoggingEventHandler - log event: 
MessageDelayedEvent(createdTime=1774271191808, 
jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, 
delayTime=3808, 
record=SourceRecord{sourcePartition={server=mysql_binlog_source}, 
sourceOffset={transaction_id=null, ts_sec=1774271188, 
file=DESKTOP-HNU0D1U-bin.000008, pos=9736, row=1, server_id=1, event=2}} 
ConnectRecord{topic='mysql_binlog_source.shop.products', kafkaPartition=null, 
key=Struct{id=110}, 
keySchema=Schema{mysql_binlog_source.shop.products.Key:STRUCT}, 
value=Struct{before=Struct{id=110,name=scooter,description=Small 2-wheel 
scooter,weight=3.140000104904175,add_column1=yy,add_column2=1},after=Struct{id=110,name=dailai,description=Small
 2-wheel 
scooter,weight=3.140000104904175,add_column1=yy,add_column2=1},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188000,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-b
 in.000008,pos=9891,row=0,thread=29},op=u,ts_ms=1774271188444}, 
valueSchema=Schema{mysql_binlog_source.shop.products.Envelope:STRUCT}, 
timestamp=null, headers=ConnectHeaders(headers=)})
   2026-03-23 21:06:31,811 INFO  
org.apache.seatunnel.api.event.LoggingEventHandler - log event: 
AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191811,
 tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, 
statement=alter table products ADD COLUMN add_column3 float not null default 
1.1, sourceDialectName=MySQL, 
changeAfter=CatalogTable{tableId=MySQL.shop.products, 
tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null), constraintKeys=[]), 
options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()})), 
events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191811,
 tableIdentifier=.shop.products, jobId=null, statement=null, 
sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, 
tableSchema=TableS
 chema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], 
enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, 
connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], 
comment='', catalogName='MySQL', metadata=MetadataSchema()}))), 
column=PhysicalColumn(super=Column(name=add_column3, dataType=FLOAT, 
columnLength=null, scale=null, nullable=false, defaultValue=1.1, comment=null, 
sourceType=FLOAT, sinkType=null, options=null, isUnsigned=false, 
isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, 
afterColumn=null)])
   2026-03-23 21:06:31,824 INFO  
org.apache.seatunnel.api.event.LoggingEventHandler - log event: 
AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191813,
 tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, 
statement=alter table products ADD COLUMN add_column4 timestamp not null 
default current_timestamp(), sourceDialectName=MySQL, 
changeAfter=CatalogTable{tableId=MySQL.shop.products, 
tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null), constraintKeys=[]), 
options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()})), 
events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191813,
 tableIdentifier=.shop.products, jobId=null, statement=null, 
sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products
 , tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null), constraintKeys=[]), 
options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()}))), 
column=PhysicalColumn(super=Column(name=add_column4, dataType=TIMESTAMP, 
columnLength=null, scale=0, nullable=false, defaultValue=current_timestamp(), 
comment=null, sourceType=TIMESTAMP, sinkType=null, options=null, 
isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), 
first=false, afterColumn=null)])
   2026-03-23 21:06:32,839 WARN  
io.debezium.connector.mysql.MySqlValueConverters - Column is missing a 
character set: add_column6 VARCHAR(64) NOT NULL DEFAULT VALUE ff
   2026-03-23 21:06:32,839 WARN  
io.debezium.connector.mysql.MySqlValueConverters - Using UTF-8 charset by 
default for column without charset: add_column6 VARCHAR(64) NOT NULL DEFAULT 
VALUE ff
   2026-03-23 21:06:32,839 INFO  
org.apache.seatunnel.api.event.LoggingEventHandler - log event: 
AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271192839,
 tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, 
statement=alter table products ADD COLUMN add_column6 varchar(64) not null 
default 'ff' after id, sourceDialectName=MySQL, 
changeAfter=CatalogTable{tableId=MySQL.shop.products, 
tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null), constraintKeys=[]), 
options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()})), 
events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271192839,
 tableIdentifier=.shop.products, jobId=null, statement=null, 
sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, 
ta
 bleSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, 
columnNames=[id], enableAutoId=null), constraintKeys=[]), 
options={table-name=shop.products, connector=jdbc, 
url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', 
catalogName='MySQL', metadata=MetadataSchema()}))), 
column=PhysicalColumn(super=Column(name=add_column6, dataType=STRING, 
columnLength=256, scale=null, nullable=false, defaultValue=ff, comment=null, 
sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, 
isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, 
afterColumn=id)])
   2026-03-23 21:06:33,645 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 5 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271193644 for job 
b1fbcd22a40d4a881708ad26b7a86a1b.
   2026-03-23 21:06:33,853 INFO  
org.apache.seatunnel.api.event.LoggingEventHandler - log event: 
MessageDelayedEvent(createdTime=1774271193853, 
jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, 
delayTime=5853, 
record=SourceRecord{sourcePartition={server=mysql_binlog_source}, 
sourceOffset={transaction_id=null, ts_sec=1774271188, 
file=DESKTOP-HNU0D1U-bin.000008, pos=13677, row=1, server_id=1, event=2}} 
ConnectRecord{topic='mysql_binlog_source.shop.products', kafkaPartition=null, 
key=Struct{id=115}, 
keySchema=Schema{mysql_binlog_source.shop.products.Key:STRUCT}, 
value=Struct{before=Struct{id=115,add_column6=ff,name=hammer,description=16oz 
carpenter's 
hammer,weight=1.0,add_column1=yy,add_column2=1,add_column3=1.100000023841858,add_column4=2026-03-23T13:06:28Z},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188000,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-bin.000008,pos=13831,row=0,thread=29},op=d,ts_ms=1774271
 188491}, 
valueSchema=Schema{mysql_binlog_source.shop.products.Envelope:STRUCT}, 
timestamp=null, headers=ConnectHeaders(headers=)})
   # Job hangs here indefinitely, no further checkpoint progress
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to