yoheimuta commented on PR #3876:
URL: https://github.com/apache/flink-cdc/pull/3876#issuecomment-2606494441

   Here is the current difference in DataChangeEvent between renaming a single 
table and renaming multiple tables:
   
   The first log comes from renaming a single table in a single statement. The 
second log is one of the DataChangeEvents generated when multiple tables are 
renamed in a single statement, specifically the event relevant to this case. 
The comparison focuses on these two logs to highlight how `source.table` 
differs between the two scenarios.
   
   <details>
   <summary>diff</summary>
   
   ```diff
   $ diff rename_single.txt rename_multi.txt
   6c6
   <             ts_sec=1737529154,
   ---
   >             ts_sec=1737425209,
   8,9c8,9
   <             pos=1054,
   <             gtids=5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46,
   ---
   >             pos=1798,
   >             gtids=5891eef6-d79c-11ef-b2d3-0242ac110004:44-48,
   17c17
   <             databaseName=customer_1ttteik
   ---
   >             databaseName=customer_1nkreoe
   27,29c27,29
   <                 ts_ms=1737529154419,
   <                 db=customer_1ttteik,
   <                 table=customers,
   ---
   >                 ts_ms=1737425209617,
   >                 db=customer_1nkreoe,
   >                 table=customers_old,customers,
   31c31
   <                 gtid=5cb040cc-d88e-11ef-aaf7-0242ac110004:47,
   ---
   >                 gtid=5891eef6-d79c-11ef-b2d3-0242ac110004:49,
   33c33
   <                 pos=898,
   ---
   >                 pos=1580,
   39c39
   <                     "pos": 898,
   ---
   >                     "pos": 1580,
   44c44
   <                     "ts_sec": 1737529154,
   ---
   >                     "ts_sec": 1737425209,
   46,47c46,47
   <                     "pos": 1054,
   <                     "gtids": "5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46",
   ---
   >                     "pos": 1798,
   >                     "gtids": "5891eef6-d79c-11ef-b2d3-0242ac110004:44-48",
   50,51c50,51
   <                 "databaseName": "customer_1ttteik",
   <                 "ddl": "RENAME TABLE customer_1ttteik.customers_copy TO 
customer_1ttteik.customers",
   ---
   >                 "databaseName": "customer_1nkreoe",
   >                 "ddl": "RENAME TABLE customer_1nkreoe.customers_copy TO 
customer_1nkreoe.customers",
   55,56c55,56
   <                         "id": "\"customer_1ttteik\".\"customers\"",
   <                         "previousId": 
"\"customer_1ttteik\".\"customers_copy\"",
   ---
   >                         "id": "\"customer_1nkreoe\".\"customers\"",
   >                         "previousId": 
"\"customer_1nkreoe\".\"customers_copy\"",
   
   ```
   
   ```
   $ cat rename_single.txt
   DataChangeEvent [
       record=SourceRecord {
           sourcePartition={server=mysql_binlog_source},
           sourceOffset={
               transaction_id=null,
               ts_sec=1737529154,
               file=mysql-bin.000004,
               pos=1054,
               gtids=5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46,
               server_id=223344
           }
       },
       ConnectRecord {
           topic='mysql_binlog_source',
           kafkaPartition=0,
           key=Struct {
               databaseName=customer_1ttteik
           },
           keySchema=Schema {
               io.debezium.connector.mysql.SchemaChangeKey:STRUCT
           },
           value=Struct {
               source=Struct {
                   version=1.9.8.Final,
                   connector=mysql,
                   name=mysql_binlog_source,
                   ts_ms=1737529154419,
                   db=customer_1ttteik,
                   table=customers,
                   server_id=223344,
                   gtid=5cb040cc-d88e-11ef-aaf7-0242ac110004:47,
                   file=mysql-bin.000004,
                   pos=898,
                   row=0
               },
               historyRecord={
                   "source": {
                       "file": "mysql-bin.000004",
                       "pos": 898,
                       "server_id": 223344
                   },
                   "position": {
                       "transaction_id": null,
                       "ts_sec": 1737529154,
                       "file": "mysql-bin.000004",
                       "pos": 1054,
                       "gtids": "5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46",
                       "server_id": 223344
                   },
                   "databaseName": "customer_1ttteik",
                   "ddl": "RENAME TABLE customer_1ttteik.customers_copy TO 
customer_1ttteik.customers",
                   "tableChanges": [
                       {
                           "type": "ALTER",
                           "id": "\"customer_1ttteik\".\"customers\"",
                           "previousId": 
"\"customer_1ttteik\".\"customers_copy\"",
                           "table": {
                               "defaultCharsetName": "latin1",
                               "primaryKeyColumnNames": ["id"],
                               "columns": [
                                   {
                                       "name": "id",
                                       "jdbcType": 4,
                                       "typeName": "INT",
                                       "typeExpression": "INT",
                                       "charsetName": null,
                                       "length": 11,
                                       "position": 1,
                                       "optional": false,
                                       "autoIncremented": false,
                                       "generated": false,
                                       "comment": null,
                                       "hasDefaultValue": false,
                                       "enumValues": []
                                   },
                                   {
                                       "name": "name",
                                       "jdbcType": 12,
                                       "typeName": "VARCHAR",
                                       "typeExpression": "VARCHAR",
                                       "charsetName": "latin1",
                                       "length": 255,
                                       "position": 2,
                                       "optional": false,
                                       "autoIncremented": false,
                                       "generated": false,
                                       "comment": null,
                                       "hasDefaultValue": true,
                                       "defaultValueExpression": "flink",
                                       "enumValues": []
                                   },
                                   {
                                       "name": "address",
                                       "jdbcType": 12,
                                       "typeName": "VARCHAR",
                                       "typeExpression": "VARCHAR",
                                       "charsetName": "latin1",
                                       "length": 1024,
                                       "position": 3,
                                       "optional": true,
                                       "autoIncremented": false,
                                       "generated": false,
                                       "comment": null,
                                       "hasDefaultValue": true,
                                       "enumValues": []
                                   }
                               ]
                           },
                           "comment": null
                       }
                   ]
               }
           },
           valueSchema=Schema {
               io.debezium.connector.mysql.SchemaChangeValue:STRUCT
           },
           timestamp=null,
           headers=ConnectHeaders(headers=)
       }
   ]
   
   ```
   
   ```
   $ cat rename_multi.txt
   DataChangeEvent [
       record=SourceRecord {
           sourcePartition={server=mysql_binlog_source},
           sourceOffset={
               transaction_id=null,
               ts_sec=1737425209,
               file=mysql-bin.000004,
               pos=1798,
               gtids=5891eef6-d79c-11ef-b2d3-0242ac110004:44-48,
               server_id=223344
           }
       },
       ConnectRecord {
           topic='mysql_binlog_source',
           kafkaPartition=0,
           key=Struct {
               databaseName=customer_1nkreoe
           },
           keySchema=Schema {
               io.debezium.connector.mysql.SchemaChangeKey:STRUCT
           },
           value=Struct {
               source=Struct {
                   version=1.9.8.Final,
                   connector=mysql,
                   name=mysql_binlog_source,
                   ts_ms=1737425209617,
                   db=customer_1nkreoe,
                   table=customers_old,customers,
                   server_id=223344,
                   gtid=5891eef6-d79c-11ef-b2d3-0242ac110004:49,
                   file=mysql-bin.000004,
                   pos=1580,
                   row=0
               },
               historyRecord={
                   "source": {
                       "file": "mysql-bin.000004",
                       "pos": 1580,
                       "server_id": 223344
                   },
                   "position": {
                       "transaction_id": null,
                       "ts_sec": 1737425209,
                       "file": "mysql-bin.000004",
                       "pos": 1798,
                       "gtids": "5891eef6-d79c-11ef-b2d3-0242ac110004:44-48",
                       "server_id": 223344
                   },
                   "databaseName": "customer_1nkreoe",
                   "ddl": "RENAME TABLE customer_1nkreoe.customers_copy TO 
customer_1nkreoe.customers",
                   "tableChanges": [
                       {
                           "type": "ALTER",
                           "id": "\"customer_1nkreoe\".\"customers\"",
                           "previousId": 
"\"customer_1nkreoe\".\"customers_copy\"",
                           "table": {
                               "defaultCharsetName": "latin1",
                               "primaryKeyColumnNames": ["id"],
                               "columns": [
                                   {
                                       "name": "id",
                                       "jdbcType": 4,
                                       "typeName": "INT",
                                       "typeExpression": "INT",
                                       "charsetName": null,
                                       "length": 11,
                                       "position": 1,
                                       "optional": false,
                                       "autoIncremented": false,
                                       "generated": false,
                                       "comment": null,
                                       "hasDefaultValue": false,
                                       "enumValues": []
                                   },
                                   {
                                       "name": "name",
                                       "jdbcType": 12,
                                       "typeName": "VARCHAR",
                                       "typeExpression": "VARCHAR",
                                       "charsetName": "latin1",
                                       "length": 255,
                                       "position": 2,
                                       "optional": false,
                                       "autoIncremented": false,
                                       "generated": false,
                                       "comment": null,
                                       "hasDefaultValue": true,
                                       "defaultValueExpression": "flink",
                                       "enumValues": []
                                   },
                                   {
                                       "name": "address",
                                       "jdbcType": 12,
                                       "typeName": "VARCHAR",
                                       "typeExpression": "VARCHAR",
                                       "charsetName": "latin1",
                                       "length": 1024,
                                       "position": 3,
                                       "optional": true,
                                       "autoIncremented": false,
                                       "generated": false,
                                       "comment": null,
                                       "hasDefaultValue": true,
                                       "enumValues": []
                                   }
                               ]
                           },
                           "comment": null
                       }
                   ]
               }
           },
           valueSchema=Schema {
               io.debezium.connector.mysql.SchemaChangeValue:STRUCT
           },
           timestamp=null,
           headers=ConnectHeaders(headers=)
       }
   ]
   ```
   </details>
   
   
   Excluding random values such as timestamps, positions, GTIDs, and database 
names, the only meaningful difference lies in the value of source.table.
   
   ```diff
   - table=customers,
   + table=customers_old,customers,
   ```
   
   In the case of renaming a single table, the source.table contains one table 
name. However, for renaming multiple tables in a single statement, the 
source.table includes multiple table names, separated by commas.
   
   This behavior was observed from logs printed during the BinlogSplitReader 
execution, which shows how the DataChangeEvent differs in these scenarios.
   
   ```diff
   diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
   index cd3c697e..49f567a9 100644
   --- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
   +++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
   @@ -165,6 +165,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
            if (currentTaskRunning) {
                List<DataChangeEvent> batch = queue.poll();
                for (DataChangeEvent event : batch) {
   +                LOG.info("Read binlog event: {}", event);
                    if (isParsingOnLineSchemaChanges) {
                        Optional<SourceRecord> oscRecord =
                                parseOnLineSchemaChangeEvent(event.getRecord());
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to