This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4e4d2b8ee5 [Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled 
characters (#7046)
4e4d2b8ee5 is described below

commit 4e4d2b8ee5fab005886e22e4481e10462d67fa6f
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Tue Jun 25 13:36:30 2024 +0800

    [Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled characters (#7046)
---
 .../config/CustomMySqlConnectionConfiguration.java | 56 ++++++++++++++++++++++
 .../reader/fetch/MySqlSourceFetchTaskContext.java  |  4 +-
 .../cdc/mysql/utils/MySqlConnectionUtils.java      |  4 +-
 .../seatunnel/cdc/mysql/utils/MySqlUtils.java      | 46 ------------------
 .../src/test/resources/ddl/mysql_cdc.sql           |  4 +-
 5 files changed, 63 insertions(+), 51 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java
new file mode 100644
index 0000000000..4e8c2d1eb8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import static io.debezium.connector.mysql.MySqlConnectorConfig.JDBC_DRIVER;
+
+public class CustomMySqlConnectionConfiguration
+        extends MySqlConnection.MySqlConnectionConfiguration {
+
+    protected static final String URL_PATTERN =
+            
"jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
+
+    private final JdbcConnection.ConnectionFactory connectionFactory;
+
+    public CustomMySqlConnectionConfiguration(Configuration config) {
+        super(config);
+        String driverClassName =
+                config.getString(JDBC_DRIVER.name(), 
JDBC_DRIVER.defaultValueAsString());
+        connectionFactory =
+                JdbcConnection.patternBasedFactory(
+                        URL_PATTERN, driverClassName, 
getClass().getClassLoader());
+    }
+
+    @Override
+    public JdbcConnection.ConnectionFactory factory() {
+        return new JdbcConnection.ConnectionFactory() {
+            @Override
+            public Connection connect(JdbcConfiguration config) throws 
SQLException {
+                return connectionFactory.connect(config);
+            }
+        };
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index 932fb7ef35..a67bc30dcc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
 import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
 
 import org.apache.kafka.connect.data.Struct;
@@ -120,7 +121,8 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
         this.topicSelector = 
MySqlTopicSelector.defaultSelector(connectorConfig);
 
         this.databaseSchema =
-                MySqlUtils.createMySqlDatabaseSchema(connectorConfig, 
tableIdCaseInsensitive);
+                MySqlConnectionUtils.createMySqlDatabaseSchema(
+                        connectorConfig, tableIdCaseInsensitive);
         this.offsetContext =
                 loadStartingOffsetState(
                         new MySqlOffsetContext.Loader(connectorConfig), 
sourceSplitBase);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
index 3a63c5d090..d38553677c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
 
 import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.CustomMySqlConnectionConfiguration;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
 
 import com.github.shyiko.mysql.binlog.BinaryLogClient;
@@ -44,8 +45,7 @@ public class MySqlConnectionUtils {
 
     /** Creates a new {@link MySqlConnection}, but not open the connection. */
     public static MySqlConnection createMySqlConnection(Configuration 
dbzConfiguration) {
-        return new MySqlConnection(
-                new 
MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
+        return new MySqlConnection(new 
CustomMySqlConnectionConfiguration(dbzConfiguration));
     }
 
     /** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 032f185d0c..cff223f676 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -24,19 +24,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogO
 
 import org.apache.kafka.connect.source.SourceRecord;
 
-import io.debezium.connector.mysql.MySqlConnectorConfig;
-import io.debezium.connector.mysql.MySqlDatabaseSchema;
-import io.debezium.connector.mysql.MySqlTopicSelector;
-import io.debezium.connector.mysql.MySqlValueConverters;
 import io.debezium.jdbc.JdbcConnection;
-import io.debezium.jdbc.JdbcValueConverters;
-import io.debezium.jdbc.TemporalPrecisionMode;
 import io.debezium.relational.Column;
 import io.debezium.relational.RelationalDatabaseConnectorConfig;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
-import io.debezium.schema.TopicSelector;
-import io.debezium.util.SchemaNameAdjuster;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
@@ -342,44 +334,6 @@ public class MySqlUtils {
         return getSplitType(primaryKeys.get(0), dbzConnectorConfig);
     }
 
-    /** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql 
database schemas. */
-    public static MySqlDatabaseSchema createMySqlDatabaseSchema(
-            MySqlConnectorConfig dbzMySqlConfig, boolean 
isTableIdCaseSensitive) {
-        TopicSelector<TableId> topicSelector = 
MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
-        SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
-        MySqlValueConverters valueConverters = 
getValueConverters(dbzMySqlConfig);
-        return new MySqlDatabaseSchema(
-                dbzMySqlConfig,
-                valueConverters,
-                topicSelector,
-                schemaNameAdjuster,
-                isTableIdCaseSensitive);
-    }
-
-    private static MySqlValueConverters 
getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
-        TemporalPrecisionMode timePrecisionMode = 
dbzMySqlConfig.getTemporalPrecisionMode();
-        JdbcValueConverters.DecimalMode decimalMode = 
dbzMySqlConfig.getDecimalMode();
-        String bigIntUnsignedHandlingModeStr =
-                dbzMySqlConfig
-                        .getConfig()
-                        
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
-        MySqlConnectorConfig.BigIntUnsignedHandlingMode 
bigIntUnsignedHandlingMode =
-                MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
-                        bigIntUnsignedHandlingModeStr);
-        JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
-                bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
-
-        boolean timeAdjusterEnabled =
-                
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
-        return new MySqlValueConverters(
-                decimalMode,
-                timePrecisionMode,
-                bigIntUnsignedMode,
-                dbzMySqlConfig.binaryHandlingMode(),
-                timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x 
-> x,
-                MySqlValueConverters::defaultParsingErrorHandler);
-    }
-
     public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
         return getBinlogPosition(dataRecord.sourceOffset());
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index b909f9aacd..1103634162 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -50,7 +50,7 @@ CREATE TABLE mysql_cdc_e2e_source_table
     `f_mediumtext`         mediumtext,
     `f_text`               text,
     `f_tinytext`           tinytext,
-    `f_varchar`            varchar(100)                   DEFAULT NULL,
+    `f_varchar`            varchar(100) collate gbk_bin   DEFAULT NULL,
     `f_date`               date                           DEFAULT NULL,
     `f_datetime`           datetime                       DEFAULT NULL,
     `f_timestamp`          timestamp NULL                 DEFAULT NULL,
@@ -333,7 +333,7 @@ VALUES ( 1, 
0x616263740000000000000000000000000000000000000000000000000000000000
          0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
          0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 
654321, 1234567, 7654321, 1234567, 7654321,
          123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long 
text field', 'This is a medium text field',
-         'This is a text field', 'This is a tiny text field', 'This is a 
varchar field', '2022-04-27', '2022-04-27 14:30:00',
+         'This is a text field', 'This is a tiny text field', '中文测试', 
'2022-04-27', '2022-04-27 14:30:00',
          '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
          
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',
          12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),

Reply via email to