yuxiqian commented on code in PR #3876: URL: https://github.com/apache/flink-cdc/pull/3876#discussion_r1944638776
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java: ########## @@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOExcepti String historyStr = DOCUMENT_WRITER.write(historyRecord.document()); Struct value = new Struct(schemaChangeValueSchema); - value.put(HistoryRecord.Fields.SOURCE, event.getSource()); + value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event)); value.put(HISTORY_RECORD_FIELD, historyStr); return value; } + /** + * Rewrites the table name in the Source if needed to handle schema changes properly. + * + * <p>This method addresses a specific issue when renaming multiple tables within a single + * statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO + * customers;}. + * + * <p>In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema} + * emits two separate change events: + * + * <ul> + * <li>{@code RENAME TABLE customers TO customers_old} + * <li>{@code RENAME TABLE customers_copy TO customers} + * </ul> + * + * <p>Both events share a table name of {@code customers, customers_old} in their source + * info, which includes multiple table IDs in a single string. + * + * <p>On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the + * schema change: + * + * <ul> + * <li>The change for {@code RENAME TABLE customers_copy TO customers} has the {@code + * customers} ID. + * <li>The change for {@code RENAME TABLE customers TO customers_old} is empty. + * </ul> + * + * <p>The problem arises because {@link + * org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect + * multiple table IDs in the source info. As a result, changes for tables defined by the + * table filter configuration (e.g., {@code customers}) may be filtered out unintentionally. + * This can lead to schema changes not being saved in the state, which is crucial for + * recovering the job from a snapshot. + * + * <p>To resolve this issue, this method: + * + * <ol> + * <li>Checks if the source info contains multiple table names. + * <li>Verifies if the {@code TableChange#id} matches one of the table names. + * <li>Updates the source info with the correct table name that conforms to Flink CDC + * expectations, ensuring the schema change is saved correctly. + * </ol> + * + * @param event the schema change event emitted by Debezium. + * @return the updated source info with the corrected table name if necessary. + */ + private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) { + Struct sourceInfo = event.getSource(); + String tableName = sourceInfo.getString(TABLE_NAME_KEY); + if (tableName == null || tableName.isEmpty()) { + return sourceInfo; + } + + List<String> tableNames = parseTableNames(tableName); + if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) { + for (TableChanges.TableChange tableChange : event.getTableChanges()) { Review Comment: IIUC the tricky DDL event is `RENAME TABLE A TO A_old, A_copy TO A`, and Debezium would generate two `SchemaChangeEvent`s, `Rename(A => A_old)` and `Rename(A_COPY => A)`. Both of them carry `A,A_old` in the `TABLE_NAME_KEY` field. But seems the rewrite logic will not work for the previous event (`Rename(A => A_old)`) because `A_old` is excluded from the capturing list, so it will be ignored when constructing `SchemaChangeEvent` [[1]](https://github.com/debezium/debezium/blob/c88a467e91a7acd0dd535c076a337439e23147ed/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java#L309-L316), [[2]](https://github.com/debezium/debezium/blob/3a93f02b4cbdc111557e5b5f7837b59005abfea8/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java#L100-L102), and `getTableChanges` will always be an empty set [[3]](https://github.com/debezium/debezium/blob/c88a467e91a7acd0dd535c076a337439e23147ed/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java#L44-L45). Is such behavior intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org