yoheimuta commented on PR #3876: URL: https://github.com/apache/flink-cdc/pull/3876#issuecomment-2606494441
Here is the current difference in DataChangeEvent between renaming a single table and renaming multiple tables: The first log comes from renaming a single table in a single statement. The second log is one of the DataChangeEvents generated when multiple tables are renamed in a single statement, specifically the event relevant to this case. The comparison focuses on these two logs to highlight how `source.table` differs between the two scenarios. <details> <summary>diff</summary> ```diff $ diff rename_single.txt rename_multi.txt 6c6 < ts_sec=1737529154, --- > ts_sec=1737425209, 8,9c8,9 < pos=1054, < gtids=5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46, --- > pos=1798, > gtids=5891eef6-d79c-11ef-b2d3-0242ac110004:44-48, 17c17 < databaseName=customer_1ttteik --- > databaseName=customer_1nkreoe 27,29c27,29 < ts_ms=1737529154419, < db=customer_1ttteik, < table=customers, --- > ts_ms=1737425209617, > db=customer_1nkreoe, > table=customers_old,customers, 31c31 < gtid=5cb040cc-d88e-11ef-aaf7-0242ac110004:47, --- > gtid=5891eef6-d79c-11ef-b2d3-0242ac110004:49, 33c33 < pos=898, --- > pos=1580, 39c39 < "pos": 898, --- > "pos": 1580, 44c44 < "ts_sec": 1737529154, --- > "ts_sec": 1737425209, 46,47c46,47 < "pos": 1054, < "gtids": "5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46", --- > "pos": 1798, > "gtids": "5891eef6-d79c-11ef-b2d3-0242ac110004:44-48", 50,51c50,51 < "databaseName": "customer_1ttteik", < "ddl": "RENAME TABLE customer_1ttteik.customers_copy TO customer_1ttteik.customers", --- > "databaseName": "customer_1nkreoe", > "ddl": "RENAME TABLE customer_1nkreoe.customers_copy TO customer_1nkreoe.customers", 55,56c55,56 < "id": "\"customer_1ttteik\".\"customers\"", < "previousId": "\"customer_1ttteik\".\"customers_copy\"", --- > "id": "\"customer_1nkreoe\".\"customers\"", > "previousId": "\"customer_1nkreoe\".\"customers_copy\"", ``` ``` $ cat rename_single.txt DataChangeEvent [ record=SourceRecord { sourcePartition={server=mysql_binlog_source}, sourceOffset={ transaction_id=null, ts_sec=1737529154, file=mysql-bin.000004, pos=1054, gtids=5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46, server_id=223344 } }, ConnectRecord { topic='mysql_binlog_source', kafkaPartition=0, key=Struct { databaseName=customer_1ttteik }, keySchema=Schema { io.debezium.connector.mysql.SchemaChangeKey:STRUCT }, value=Struct { source=Struct { version=1.9.8.Final, connector=mysql, name=mysql_binlog_source, ts_ms=1737529154419, db=customer_1ttteik, table=customers, server_id=223344, gtid=5cb040cc-d88e-11ef-aaf7-0242ac110004:47, file=mysql-bin.000004, pos=898, row=0 }, historyRecord={ "source": { "file": "mysql-bin.000004", "pos": 898, "server_id": 223344 }, "position": { "transaction_id": null, "ts_sec": 1737529154, "file": "mysql-bin.000004", "pos": 1054, "gtids": "5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46", "server_id": 223344 }, "databaseName": "customer_1ttteik", "ddl": "RENAME TABLE customer_1ttteik.customers_copy TO customer_1ttteik.customers", "tableChanges": [ { "type": "ALTER", "id": "\"customer_1ttteik\".\"customers\"", "previousId": "\"customer_1ttteik\".\"customers_copy\"", "table": { "defaultCharsetName": "latin1", "primaryKeyColumnNames": ["id"], "columns": [ { "name": "id", "jdbcType": 4, "typeName": "INT", "typeExpression": "INT", "charsetName": null, "length": 11, "position": 1, "optional": false, "autoIncremented": false, "generated": false, "comment": null, "hasDefaultValue": false, "enumValues": [] }, { "name": "name", "jdbcType": 12, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 255, "position": 2, "optional": false, "autoIncremented": false, "generated": false, "comment": null, "hasDefaultValue": true, "defaultValueExpression": "flink", "enumValues": [] }, { "name": "address", "jdbcType": 12, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 1024, "position": 3, "optional": true, "autoIncremented": false, "generated": false, "comment": null, "hasDefaultValue": true, "enumValues": [] } ] }, "comment": null } ] } }, valueSchema=Schema { io.debezium.connector.mysql.SchemaChangeValue:STRUCT }, timestamp=null, headers=ConnectHeaders(headers=) } ] ``` ``` $ cat rename_multi.txt DataChangeEvent [ record=SourceRecord { sourcePartition={server=mysql_binlog_source}, sourceOffset={ transaction_id=null, ts_sec=1737425209, file=mysql-bin.000004, pos=1798, gtids=5891eef6-d79c-11ef-b2d3-0242ac110004:44-48, server_id=223344 } }, ConnectRecord { topic='mysql_binlog_source', kafkaPartition=0, key=Struct { databaseName=customer_1nkreoe }, keySchema=Schema { io.debezium.connector.mysql.SchemaChangeKey:STRUCT }, value=Struct { source=Struct { version=1.9.8.Final, connector=mysql, name=mysql_binlog_source, ts_ms=1737425209617, db=customer_1nkreoe, table=customers_old,customers, server_id=223344, gtid=5891eef6-d79c-11ef-b2d3-0242ac110004:49, file=mysql-bin.000004, pos=1580, row=0 }, historyRecord={ "source": { "file": "mysql-bin.000004", "pos": 1580, "server_id": 223344 }, "position": { "transaction_id": null, "ts_sec": 1737425209, "file": "mysql-bin.000004", "pos": 1798, "gtids": "5891eef6-d79c-11ef-b2d3-0242ac110004:44-48", "server_id": 223344 }, "databaseName": "customer_1nkreoe", "ddl": "RENAME TABLE customer_1nkreoe.customers_copy TO customer_1nkreoe.customers", "tableChanges": [ { "type": "ALTER", "id": "\"customer_1nkreoe\".\"customers\"", "previousId": "\"customer_1nkreoe\".\"customers_copy\"", "table": { "defaultCharsetName": "latin1", "primaryKeyColumnNames": ["id"], "columns": [ { "name": "id", "jdbcType": 4, "typeName": "INT", "typeExpression": "INT", "charsetName": null, "length": 11, "position": 1, "optional": false, "autoIncremented": false, "generated": false, "comment": null, "hasDefaultValue": false, "enumValues": [] }, { "name": "name", "jdbcType": 12, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 255, "position": 2, "optional": false, "autoIncremented": false, "generated": false, "comment": null, "hasDefaultValue": true, "defaultValueExpression": "flink", "enumValues": [] }, { "name": "address", "jdbcType": 12, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "latin1", "length": 1024, "position": 3, "optional": true, "autoIncremented": false, "generated": false, "comment": null, "hasDefaultValue": true, "enumValues": [] } ] }, "comment": null } ] } }, valueSchema=Schema { io.debezium.connector.mysql.SchemaChangeValue:STRUCT }, timestamp=null, headers=ConnectHeaders(headers=) } ] ``` </details> Excluding random values such as timestamps, positions, GTIDs, and database names, the only meaningful difference lies in the value of source.table. ```diff - table=customers, + table=customers_old,customers, ``` In the case of renaming a single table, the source.table contains one table name. However, for renaming multiple tables in a single statement, the source.table includes multiple table names, separated by commas. This behavior was observed from logs printed during the BinlogSplitReader execution, which shows how the DataChangeEvent differs in these scenarios. ```diff diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index cd3c697e..49f567a9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -165,6 +165,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl if (currentTaskRunning) { List<DataChangeEvent> batch = queue.poll(); for (DataChangeEvent event : batch) { + LOG.info("Read binlog event: {}", event); if (isParsingOnLineSchemaChanges) { Optional<SourceRecord> oscRecord = parseOnLineSchemaChangeEvent(event.getRecord()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org