yuxiqian commented on code in PR #3876:
URL: https://github.com/apache/flink-cdc/pull/3876#discussion_r1944638776


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java:
##########
@@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent 
event) throws IOExcepti
             String historyStr = 
DOCUMENT_WRITER.write(historyRecord.document());
 
             Struct value = new Struct(schemaChangeValueSchema);
-            value.put(HistoryRecord.Fields.SOURCE, event.getSource());
+            value.put(HistoryRecord.Fields.SOURCE, 
rewriteTableNameIfNeeded(event));
             value.put(HISTORY_RECORD_FIELD, historyStr);
             return value;
         }
 
+        /**
+         * Rewrites the table name in the Source if needed to handle schema 
changes properly.
+         *
+         * <p>This method addresses a specific issue when renaming multiple 
tables within a single
+         * statement, such as: {@code RENAME TABLE customers TO customers_old, 
customers_copy TO
+         * customers;}.
+         *
+         * <p>In such cases, Debezium's {@link 
io.debezium.connector.mysql.MySqlDatabaseSchema}
+         * emits two separate change events:
+         *
+         * <ul>
+         *   <li>{@code RENAME TABLE customers TO customers_old}
+         *   <li>{@code RENAME TABLE customers_copy TO customers}
+         * </ul>
+         *
+         * <p>Both events share a table name of {@code customers, 
customers_old} in their source
+         * info, which includes multiple table IDs in a single string.
+         *
+         * <p>On the other hand, the {@code TableChanges.TableChange#id} 
correctly identifies the
+         * schema change:
+         *
+         * <ul>
+         *   <li>The change for {@code RENAME TABLE customers_copy TO 
customers} has the {@code
+         *       customers} ID.
+         *   <li>The change for {@code RENAME TABLE customers TO 
customers_old} is empty.
+         * </ul>
+         *
+         * <p>The problem arises because {@link
+         * 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does 
not expect
+         * multiple table IDs in the source info. As a result, changes for 
tables defined by the
+         * table filter configuration (e.g., {@code customers}) may be 
filtered out unintentionally.
+         * This can lead to schema changes not being saved in the state, which 
is crucial for
+         * recovering the job from a snapshot.
+         *
+         * <p>To resolve this issue, this method:
+         *
+         * <ol>
+         *   <li>Checks if the source info contains multiple table names.
+         *   <li>Verifies if the {@code TableChange#id} matches one of the 
table names.
+         *   <li>Updates the source info with the correct table name that 
conforms to Flink CDC
+         *       expectations, ensuring the schema change is saved correctly.
+         * </ol>
+         *
+         * @param event the schema change event emitted by Debezium.
+         * @return the updated source info with the corrected table name if 
necessary.
+         */
+        private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) {
+            Struct sourceInfo = event.getSource();
+            String tableName = sourceInfo.getString(TABLE_NAME_KEY);
+            if (tableName == null || tableName.isEmpty()) {
+                return sourceInfo;
+            }
+
+            List<String> tableNames = parseTableNames(tableName);
+            if (2 <= tableNames.size() && 
event.getDdl().toLowerCase().startsWith("rename")) {
+                for (TableChanges.TableChange tableChange : 
event.getTableChanges()) {

Review Comment:
   IIUC the tricky DDL event is `RENAME TABLE A TO A_old, A_copy TO A`, and 
Debezium would generate two `SchemaChangeEvent`s, `Rename(A => A_old)` and 
`Rename(A_COPY => A)`. Both of them carry `A,A_old` in the `TABLE_NAME_KEY` 
field.
   
   But seems the rewrite logic will not work for the previous event (`Rename(A 
=> A_old)`) because `A_old` is excluded from the capturing list, so it will be 
ignored when constructing `SchemaChangeEvent` 
[[1]](https://github.com/debezium/debezium/blob/c88a467e91a7acd0dd535c076a337439e23147ed/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java#L309-L316),
 
[[2]](https://github.com/debezium/debezium/blob/3a93f02b4cbdc111557e5b5f7837b59005abfea8/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java#L100-L102),
 and `getTableChanges` will always be an empty set 
[[3]](https://github.com/debezium/debezium/blob/c88a467e91a7acd0dd535c076a337439e23147ed/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java#L44-L45).
 Is such behavior intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to