yoheimuta commented on code in PR #3876: URL: https://github.com/apache/flink-cdc/pull/3876#discussion_r1945968557
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlRenameTablesSchemaFixITCase.java: ########## @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.TestTable; +import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.FlinkRuntimeException; + +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.io.File; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; + +/** + * Integration tests for handling schema changes regard to renaming multiple tables within a single + * statement. + */ +public class MySqlRenameTablesSchemaFixITCase { Review Comment: https://github.com/apache/flink-cdc/pull/3876/commits/4fb3b7da3f4e94d252a1982dcea6b197ca3f7e8e renamed the class name. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java: ########## @@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOExcepti String historyStr = DOCUMENT_WRITER.write(historyRecord.document()); Struct value = new Struct(schemaChangeValueSchema); - value.put(HistoryRecord.Fields.SOURCE, event.getSource()); + value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event)); value.put(HISTORY_RECORD_FIELD, historyStr); return value; } + /** + * Rewrites the table name in the Source if needed to handle schema changes properly. + * + * <p>This method addresses a specific issue when renaming multiple tables within a single + * statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO + * customers;}. + * + * <p>In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema} + * emits two separate change events: + * + * <ul> + * <li>{@code RENAME TABLE customers TO customers_old} + * <li>{@code RENAME TABLE customers_copy TO customers} + * </ul> + * + * <p>Both events share a table name of {@code customers, customers_old} in their source + * info, which includes multiple table IDs in a single string. + * + * <p>On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the + * schema change: + * + * <ul> + * <li>The change for {@code RENAME TABLE customers_copy TO customers} has the {@code + * customers} ID. + * <li>The change for {@code RENAME TABLE customers TO customers_old} is empty. + * </ul> + * + * <p>The problem arises because {@link + * org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect + * multiple table IDs in the source info. As a result, changes for tables defined by the + * table filter configuration (e.g., {@code customers}) may be filtered out unintentionally. + * This can lead to schema changes not being saved in the state, which is crucial for + * recovering the job from a snapshot. + * + * <p>To resolve this issue, this method: + * + * <ol> + * <li>Checks if the source info contains multiple table names. + * <li>Verifies if the {@code TableChange#id} matches one of the table names. + * <li>Updates the source info with the correct table name that conforms to Flink CDC + * expectations, ensuring the schema change is saved correctly. + * </ol> + * + * @param event the schema change event emitted by Debezium. + * @return the updated source info with the corrected table name if necessary. + */ + private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) { + Struct sourceInfo = event.getSource(); + String tableName = sourceInfo.getString(TABLE_NAME_KEY); + if (tableName == null || tableName.isEmpty()) { + return sourceInfo; + } + + List<String> tableNames = parseTableNames(tableName); + if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) { + for (TableChanges.TableChange tableChange : event.getTableChanges()) { Review Comment: Yes, the behavior is intentional. We don't need to handle events related to `A_old` since it's not part of the capturing list. To ensure schema changes and data ingestion keep up correctly, it's sufficient to process rename events for tables like `A` that are in the capturing list. This approach avoids unnecessary processing for tables we're not actively monitoring. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java: ########## @@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOExcepti String historyStr = DOCUMENT_WRITER.write(historyRecord.document()); Struct value = new Struct(schemaChangeValueSchema); - value.put(HistoryRecord.Fields.SOURCE, event.getSource()); + value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event)); value.put(HISTORY_RECORD_FIELD, historyStr); return value; } + /** + * Rewrites the table name in the Source if needed to handle schema changes properly. + * + * <p>This method addresses a specific issue when renaming multiple tables within a single + * statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO + * customers;}. + * + * <p>In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema} + * emits two separate change events: + * + * <ul> + * <li>{@code RENAME TABLE customers TO customers_old} + * <li>{@code RENAME TABLE customers_copy TO customers} + * </ul> + * + * <p>Both events share a table name of {@code customers, customers_old} in their source + * info, which includes multiple table IDs in a single string. + * + * <p>On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the + * schema change: + * + * <ul> + * <li>The change for {@code RENAME TABLE customers_copy TO customers} has the {@code + * customers} ID. + * <li>The change for {@code RENAME TABLE customers TO customers_old} is empty. + * </ul> + * + * <p>The problem arises because {@link + * org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect + * multiple table IDs in the source info. As a result, changes for tables defined by the + * table filter configuration (e.g., {@code customers}) may be filtered out unintentionally. + * This can lead to schema changes not being saved in the state, which is crucial for + * recovering the job from a snapshot. + * + * <p>To resolve this issue, this method: + * + * <ol> + * <li>Checks if the source info contains multiple table names. + * <li>Verifies if the {@code TableChange#id} matches one of the table names. + * <li>Updates the source info with the correct table name that conforms to Flink CDC + * expectations, ensuring the schema change is saved correctly. + * </ol> + * + * @param event the schema change event emitted by Debezium. + * @return the updated source info with the corrected table name if necessary. + */ + private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) { + Struct sourceInfo = event.getSource(); + String tableName = sourceInfo.getString(TABLE_NAME_KEY); + if (tableName == null || tableName.isEmpty()) { + return sourceInfo; + } + + List<String> tableNames = parseTableNames(tableName); + if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) { Review Comment: The rewrite logic explicitly checks that the DDL starts with `"RENAME"` using this condition: ```java if (...event.getDdl().toLowerCase().startsWith("rename")) ``` So, other DDL events like `DROP TABLE` won’t be affected by this method. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlRenameTablesSchemaFixITCase.java: ########## @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.TestTable; +import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.FlinkRuntimeException; + +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.io.File; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; + +/** + * Integration tests for handling schema changes regard to renaming multiple tables within a single + * statement. + */ +public class MySqlRenameTablesSchemaFixITCase { + private static final Logger LOG = + LoggerFactory.getLogger(MySqlRenameTablesSchemaFixITCase.class); + @RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension(); + + @SuppressWarnings("unchecked") + private final MySqlContainer mysql = Review Comment: https://github.com/apache/flink-cdc/pull/3876/commits/a29d93b3c2ee7fb27690040e5bb79f2b6ec6339f updated the container to be static. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org