yoheimuta commented on code in PR #3876:
URL: https://github.com/apache/flink-cdc/pull/3876#discussion_r1945968557


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlRenameTablesSchemaFixITCase.java:
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
+import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
+import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.mysql.MySqlConnection;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Integration tests for handling schema changes regard to renaming multiple 
tables within a single
+ * statement.
+ */
+public class MySqlRenameTablesSchemaFixITCase {

Review Comment:
   
https://github.com/apache/flink-cdc/pull/3876/commits/4fb3b7da3f4e94d252a1982dcea6b197ca3f7e8e
 renamed the class name.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java:
##########
@@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent 
event) throws IOExcepti
             String historyStr = 
DOCUMENT_WRITER.write(historyRecord.document());
 
             Struct value = new Struct(schemaChangeValueSchema);
-            value.put(HistoryRecord.Fields.SOURCE, event.getSource());
+            value.put(HistoryRecord.Fields.SOURCE, 
rewriteTableNameIfNeeded(event));
             value.put(HISTORY_RECORD_FIELD, historyStr);
             return value;
         }
 
+        /**
+         * Rewrites the table name in the Source if needed to handle schema 
changes properly.
+         *
+         * <p>This method addresses a specific issue when renaming multiple 
tables within a single
+         * statement, such as: {@code RENAME TABLE customers TO customers_old, 
customers_copy TO
+         * customers;}.
+         *
+         * <p>In such cases, Debezium's {@link 
io.debezium.connector.mysql.MySqlDatabaseSchema}
+         * emits two separate change events:
+         *
+         * <ul>
+         *   <li>{@code RENAME TABLE customers TO customers_old}
+         *   <li>{@code RENAME TABLE customers_copy TO customers}
+         * </ul>
+         *
+         * <p>Both events share a table name of {@code customers, 
customers_old} in their source
+         * info, which includes multiple table IDs in a single string.
+         *
+         * <p>On the other hand, the {@code TableChanges.TableChange#id} 
correctly identifies the
+         * schema change:
+         *
+         * <ul>
+         *   <li>The change for {@code RENAME TABLE customers_copy TO 
customers} has the {@code
+         *       customers} ID.
+         *   <li>The change for {@code RENAME TABLE customers TO 
customers_old} is empty.
+         * </ul>
+         *
+         * <p>The problem arises because {@link
+         * 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does 
not expect
+         * multiple table IDs in the source info. As a result, changes for 
tables defined by the
+         * table filter configuration (e.g., {@code customers}) may be 
filtered out unintentionally.
+         * This can lead to schema changes not being saved in the state, which 
is crucial for
+         * recovering the job from a snapshot.
+         *
+         * <p>To resolve this issue, this method:
+         *
+         * <ol>
+         *   <li>Checks if the source info contains multiple table names.
+         *   <li>Verifies if the {@code TableChange#id} matches one of the 
table names.
+         *   <li>Updates the source info with the correct table name that 
conforms to Flink CDC
+         *       expectations, ensuring the schema change is saved correctly.
+         * </ol>
+         *
+         * @param event the schema change event emitted by Debezium.
+         * @return the updated source info with the corrected table name if 
necessary.
+         */
+        private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) {
+            Struct sourceInfo = event.getSource();
+            String tableName = sourceInfo.getString(TABLE_NAME_KEY);
+            if (tableName == null || tableName.isEmpty()) {
+                return sourceInfo;
+            }
+
+            List<String> tableNames = parseTableNames(tableName);
+            if (2 <= tableNames.size() && 
event.getDdl().toLowerCase().startsWith("rename")) {
+                for (TableChanges.TableChange tableChange : 
event.getTableChanges()) {

Review Comment:
   Yes, the behavior is intentional. We don't need to handle events related to 
`A_old` since it's not part of the capturing list. 
   
   To ensure schema changes and data ingestion keep up correctly, it's 
sufficient to process rename events for tables like `A` that are in the 
capturing list. This approach avoids unnecessary processing for tables we're 
not actively monitoring.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java:
##########
@@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent 
event) throws IOExcepti
             String historyStr = 
DOCUMENT_WRITER.write(historyRecord.document());
 
             Struct value = new Struct(schemaChangeValueSchema);
-            value.put(HistoryRecord.Fields.SOURCE, event.getSource());
+            value.put(HistoryRecord.Fields.SOURCE, 
rewriteTableNameIfNeeded(event));
             value.put(HISTORY_RECORD_FIELD, historyStr);
             return value;
         }
 
+        /**
+         * Rewrites the table name in the Source if needed to handle schema 
changes properly.
+         *
+         * <p>This method addresses a specific issue when renaming multiple 
tables within a single
+         * statement, such as: {@code RENAME TABLE customers TO customers_old, 
customers_copy TO
+         * customers;}.
+         *
+         * <p>In such cases, Debezium's {@link 
io.debezium.connector.mysql.MySqlDatabaseSchema}
+         * emits two separate change events:
+         *
+         * <ul>
+         *   <li>{@code RENAME TABLE customers TO customers_old}
+         *   <li>{@code RENAME TABLE customers_copy TO customers}
+         * </ul>
+         *
+         * <p>Both events share a table name of {@code customers, 
customers_old} in their source
+         * info, which includes multiple table IDs in a single string.
+         *
+         * <p>On the other hand, the {@code TableChanges.TableChange#id} 
correctly identifies the
+         * schema change:
+         *
+         * <ul>
+         *   <li>The change for {@code RENAME TABLE customers_copy TO 
customers} has the {@code
+         *       customers} ID.
+         *   <li>The change for {@code RENAME TABLE customers TO 
customers_old} is empty.
+         * </ul>
+         *
+         * <p>The problem arises because {@link
+         * 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does 
not expect
+         * multiple table IDs in the source info. As a result, changes for 
tables defined by the
+         * table filter configuration (e.g., {@code customers}) may be 
filtered out unintentionally.
+         * This can lead to schema changes not being saved in the state, which 
is crucial for
+         * recovering the job from a snapshot.
+         *
+         * <p>To resolve this issue, this method:
+         *
+         * <ol>
+         *   <li>Checks if the source info contains multiple table names.
+         *   <li>Verifies if the {@code TableChange#id} matches one of the 
table names.
+         *   <li>Updates the source info with the correct table name that 
conforms to Flink CDC
+         *       expectations, ensuring the schema change is saved correctly.
+         * </ol>
+         *
+         * @param event the schema change event emitted by Debezium.
+         * @return the updated source info with the corrected table name if 
necessary.
+         */
+        private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) {
+            Struct sourceInfo = event.getSource();
+            String tableName = sourceInfo.getString(TABLE_NAME_KEY);
+            if (tableName == null || tableName.isEmpty()) {
+                return sourceInfo;
+            }
+
+            List<String> tableNames = parseTableNames(tableName);
+            if (2 <= tableNames.size() && 
event.getDdl().toLowerCase().startsWith("rename")) {

Review Comment:
   The rewrite logic explicitly checks that the DDL starts with `"RENAME"` 
using this condition:
   
   ```java
   if (...event.getDdl().toLowerCase().startsWith("rename"))
   ```
   
   So, other DDL events like `DROP TABLE` won’t be affected by this method.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlRenameTablesSchemaFixITCase.java:
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
+import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
+import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.mysql.MySqlConnection;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Integration tests for handling schema changes regard to renaming multiple 
tables within a single
+ * statement.
+ */
+public class MySqlRenameTablesSchemaFixITCase {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MySqlRenameTablesSchemaFixITCase.class);
+    @RegisterExtension static MiniClusterExtension miniCluster = new 
MiniClusterExtension();
+
+    @SuppressWarnings("unchecked")
+    private final MySqlContainer mysql =

Review Comment:
   
https://github.com/apache/flink-cdc/pull/3876/commits/a29d93b3c2ee7fb27690040e5bb79f2b6ec6339f
 updated the container to be static.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to