This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 4e4d2b8ee5 [Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled characters (#7046) 4e4d2b8ee5 is described below commit 4e4d2b8ee5fab005886e22e4481e10462d67fa6f Author: hailin0 <wanghai...@apache.org> AuthorDate: Tue Jun 25 13:36:30 2024 +0800 [Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled characters (#7046) --- .../config/CustomMySqlConnectionConfiguration.java | 56 ++++++++++++++++++++++ .../reader/fetch/MySqlSourceFetchTaskContext.java | 4 +- .../cdc/mysql/utils/MySqlConnectionUtils.java | 4 +- .../seatunnel/cdc/mysql/utils/MySqlUtils.java | 46 ------------------ .../src/test/resources/ddl/mysql_cdc.sql | 4 +- 5 files changed, 63 insertions(+), 51 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java new file mode 100644 index 0000000000..4e8c2d1eb8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/CustomMySqlConnectionConfiguration.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config; + +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; + +import java.sql.Connection; +import java.sql.SQLException; + +import static io.debezium.connector.mysql.MySqlConnectorConfig.JDBC_DRIVER; + +public class CustomMySqlConnectionConfiguration + extends MySqlConnection.MySqlConnectionConfiguration { + + protected static final String URL_PATTERN = + "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}"; + + private final JdbcConnection.ConnectionFactory connectionFactory; + + public CustomMySqlConnectionConfiguration(Configuration config) { + super(config); + String driverClassName = + config.getString(JDBC_DRIVER.name(), JDBC_DRIVER.defaultValueAsString()); + connectionFactory = + JdbcConnection.patternBasedFactory( + URL_PATTERN, driverClassName, getClass().getClassLoader()); + } + + @Override + public JdbcConnection.ConnectionFactory factory() { + return new JdbcConnection.ConnectionFactory() { + @Override + public Connection connect(JdbcConfiguration config) throws SQLException { + return connectionFactory.connect(config); + } + }; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index 932fb7ef35..a67bc30dcc 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils; import org.apache.kafka.connect.data.Struct; @@ -120,7 +121,8 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig); this.databaseSchema = - MySqlUtils.createMySqlDatabaseSchema(connectorConfig, tableIdCaseInsensitive); + MySqlConnectionUtils.createMySqlDatabaseSchema( + connectorConfig, tableIdCaseInsensitive); this.offsetContext = loadStartingOffsetState( new MySqlOffsetContext.Loader(connectorConfig), sourceSplitBase); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java index 3a63c5d090..d38553677c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils; import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.CustomMySqlConnectionConfiguration; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset; import com.github.shyiko.mysql.binlog.BinaryLogClient; @@ -44,8 +45,7 @@ public class MySqlConnectionUtils { /** Creates a new {@link MySqlConnection}, but not open the connection. */ public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) { - return new MySqlConnection( - new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration)); + return new MySqlConnection(new CustomMySqlConnectionConfiguration(dbzConfiguration)); } /** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */ diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java index 032f185d0c..cff223f676 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java @@ -24,19 +24,11 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogO import org.apache.kafka.connect.source.SourceRecord; -import io.debezium.connector.mysql.MySqlConnectorConfig; -import io.debezium.connector.mysql.MySqlDatabaseSchema; -import io.debezium.connector.mysql.MySqlTopicSelector; -import io.debezium.connector.mysql.MySqlValueConverters; import io.debezium.jdbc.JdbcConnection; -import io.debezium.jdbc.JdbcValueConverters; -import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.schema.TopicSelector; -import io.debezium.util.SchemaNameAdjuster; import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -342,44 +334,6 @@ public class MySqlUtils { return getSplitType(primaryKeys.get(0), dbzConnectorConfig); } - /** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */ - public static MySqlDatabaseSchema createMySqlDatabaseSchema( - MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) { - TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig); - SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); - MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig); - return new MySqlDatabaseSchema( - dbzMySqlConfig, - valueConverters, - topicSelector, - schemaNameAdjuster, - isTableIdCaseSensitive); - } - - private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) { - TemporalPrecisionMode timePrecisionMode = dbzMySqlConfig.getTemporalPrecisionMode(); - JdbcValueConverters.DecimalMode decimalMode = dbzMySqlConfig.getDecimalMode(); - String bigIntUnsignedHandlingModeStr = - dbzMySqlConfig - .getConfig() - .getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE); - MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode = - MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse( - bigIntUnsignedHandlingModeStr); - JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode = - bigIntUnsignedHandlingMode.asBigIntUnsignedMode(); - - boolean timeAdjusterEnabled = - dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER); - return new MySqlValueConverters( - decimalMode, - timePrecisionMode, - bigIntUnsignedMode, - dbzMySqlConfig.binaryHandlingMode(), - timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x, - MySqlValueConverters::defaultParsingErrorHandler); - } - public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) { return getBinlogPosition(dataRecord.sourceOffset()); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql index b909f9aacd..1103634162 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql @@ -50,7 +50,7 @@ CREATE TABLE mysql_cdc_e2e_source_table `f_mediumtext` mediumtext, `f_text` text, `f_tinytext` tinytext, - `f_varchar` varchar(100) DEFAULT NULL, + `f_varchar` varchar(100) collate gbk_bin DEFAULT NULL, `f_date` date DEFAULT NULL, `f_datetime` datetime DEFAULT NULL, `f_timestamp` timestamp NULL DEFAULT NULL, @@ -333,7 +333,7 @@ VALUES ( 1, 0x616263740000000000000000000000000000000000000000000000000000000000 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', - 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', + 'This is a text field', 'This is a tiny text field', '中文测试', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),