This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a3b3fc7f70 [INLONG-8967][Sort] Add Mysql connector on flink 1.15 
(#8980)
a3b3fc7f70 is described below

commit a3b3fc7f70cb1e78335888daede75ebb44d62ca4
Author: EpicMo <1982742...@qq.com>
AuthorDate: Sat Oct 7 09:50:50 2023 +0800

    [INLONG-8967][Sort] Add Mysql connector on flink 1.15 (#8980)
---
 .../src/main/assemblies/sort-connectors-v1.15.xml  |   8 +
 inlong-sort/sort-core/pom.xml                      |   6 +
 .../sort-end-to-end-tests-v1.15/pom.xml            |   8 +
 .../apache/inlong/sort/tests/MysqlToRocksTest.java | 170 +++++++
 .../inlong/sort/tests/PostgresToStarRocksTest.java |  88 +---
 .../sort/tests/utils/FlinkContainerTestEnv.java    |   1 +
 .../inlong/sort/tests/utils/MySqlContainer.java    |   8 +-
 .../sort/tests/utils/StarRocksContainer.java       |   5 -
 .../inlong/sort/tests/utils/StarRocksManager.java  |  79 +++
 .../src/test/resources/docker/mysql/my.cnf         |   3 +-
 .../src/test/resources/docker/mysql/setup.sql      |   3 +-
 .../src/test/resources/flinkSql/mysql_test.sql     |  38 ++
 .../sort-connectors/mysql-cdc/pom.xml              | 178 +++++++
 .../inlong/sort/mysql/MysqlTableFactory.java       | 537 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../sort-flink-v1.15/sort-connectors/pom.xml       |   1 +
 .../sort/formats/json/utils/FormatJsonUtil.java    |   1 +
 17 files changed, 1072 insertions(+), 78 deletions(-)

diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index f61fbc0e9e..184e727a91 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -51,5 +51,13 @@
             </includes>
             <fileMode>0644</fileMode>
         </fileSet>
+        <fileSet>
+            
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-mysql-v1.15-${project.version}.jar</include>
+            </includes>
+            <fileMode>0644</fileMode>
+        </fileSet>
     </fileSets>
 </assembly>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 0d2626b748..b18604b67d 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -257,6 +257,12 @@
                     <version>${project.version}</version>
                     <scope>test</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                </dependency>
             </dependencies>
             <build>
                 <plugins>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 00cc1803ce..6841331770 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -165,6 +165,14 @@
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            
<artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sort-connector-mysql-cdc.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
                         <artifactItem>
                             <groupId>org.apache.inlong</groupId>
                             
<artifactId>sort-connector-starrocks-v1.15</artifactId>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
new file mode 100644
index 0000000000..e02501cfd1
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
+import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.StarRocksContainer;
+import org.apache.inlong.sort.tests.utils.StarRocksManager;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+
+/**
+ * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
+ * Test flink sql Mysql cdc to StarRocks
+ */
+public class MysqlToRocksTest extends FlinkContainerTestEnv {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MysqlToRocksTest.class);
+
+    private static final Path mysqlJar = 
TestUtils.getResource("sort-connector-mysql-cdc.jar");
+    private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
+    private static final Path mysqlJdbcJar = 
TestUtils.getResource("mysql-driver.jar");
+    private static final String sqlFile;
+
+    static {
+        try {
+            sqlFile =
+                    
Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
+            StarRocksManager.buildStarRocksImage();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClassRule
+    public static StarRocksContainer STAR_ROCKS =
+            (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
+                    .withExposedPorts(9030, 8030, 8040)
+                    .withNetwork(NETWORK)
+                    .withAccessToHost(true)
+                    .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
+                    .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
+
+    @ClassRule
+    public static final MySqlContainer MYSQL_CONTAINER =
+            (MySqlContainer) new 
MySqlContainer(MySqlContainer.MySqlVersion.V8_0)
+                    .withDatabaseName("test")
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("mysql")
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    @Before
+    public void setup() {
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        initializeMysqlTable();
+        initializeStarRocksTable(STAR_ROCKS);
+    }
+
+    private void initializeMysqlTable() {
+        try {
+            Class.forName(MYSQL_CONTAINER.getDriverClassName());
+            Connection conn = DriverManager
+                    .getConnection(MYSQL_CONTAINER.getJdbcUrl(), 
MYSQL_CONTAINER.getUsername(),
+                            MYSQL_CONTAINER.getPassword());
+            Statement stat = conn.createStatement();
+            stat.execute(
+                    "CREATE TABLE test_input1 (\n"
+                            + "  id SERIAL,\n"
+                            + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+                            + "  description VARCHAR(512),\n"
+                            + "  PRIMARY  KEY(id)\n"
+                            + ");");
+            stat.close();
+            conn.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.stop();
+        }
+        if (STAR_ROCKS != null) {
+            STAR_ROCKS.stop();
+        }
+    }
+
+    /**
+     * Test flink sql postgresql cdc to StarRocks
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testMysqlUpdateAndDelete() throws Exception {
+        submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
+        waitUntilJobRunning(Duration.ofSeconds(10));
+
+        // generate input
+        try (Connection conn =
+                DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), 
MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword());
+                Statement stat = conn.createStatement()) {
+            stat.execute(
+                    "INSERT INTO test_input1 "
+                            + "VALUES (1,'jacket','water resistent white wind 
breaker');");
+            stat.execute(
+                    "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel 
scooter ');");
+            stat.execute(
+                    "update test_input1 set name = 'tom' where id = 2;");
+            stat.execute(
+                    "delete from test_input1 where id = 1;");
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        JdbcProxy proxy =
+                new JdbcProxy(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
+                        STAR_ROCKS.getPassword(),
+                        STAR_ROCKS.getDriverClassName());
+        List<String> expectResult =
+                Arrays.asList("2,tom,Big 2-wheel scooter ");
+        proxy.checkResultWithTimeout(
+                expectResult,
+                "test_output1",
+                3,
+                60000L);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
index 499cc4b7f2..5c7208a7f6 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
@@ -28,12 +28,9 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.MountableFile;
 
 import java.net.URISyntaxException;
 import java.nio.file.Path;
@@ -45,8 +42,12 @@ import java.sql.Statement;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Stream;
 
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
 /**
  * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
  * Test flink sql Postgres cdc to StarRocks
@@ -58,72 +59,42 @@ public class PostgresToStarRocksTest extends 
FlinkContainerTestEnv {
     private static final Path postgresJar = 
TestUtils.getResource("sort-connector-postgres-cdc.jar");
     private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
     private static final Path mysqlJdbcJar = 
TestUtils.getResource("mysql-driver.jar");
-
-    private static final Logger STAR_ROCKS_LOG = 
LoggerFactory.getLogger(StarRocksContainer.class);
-
     private static final String sqlFile;
 
-    // 
----------------------------------------------------------------------------------------
-    // StarRocks Variables
-    // 
----------------------------------------------------------------------------------------
-    private static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks";
-    private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks";
-    private static final String NEW_STARROCKS_TAG = "latest";
-    private static final String STAR_ROCKS_IMAGE_NAME = 
"starrocks/allin1-ubi:3.0.4";
-
     static {
         try {
-            sqlFile = 
Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI()).toString();
+            sqlFile = 
Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI())
+                    .toString();
             buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
     }
 
-    private static String getNewStarRocksImageName() {
-        return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG;
-    }
-
-    public static void buildStarRocksImage() {
-        GenericContainer oldStarRocks = new 
GenericContainer(STAR_ROCKS_IMAGE_NAME);
-        Startables.deepStart(Stream.of(oldStarRocks)).join();
-        
oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"),
-                "/data/deploy/");
-        try {
-            oldStarRocks.execInContainer("chmod", "+x", 
"/data/deploy/start_fe_be.sh");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        oldStarRocks.getDockerClient()
-                .commitCmd(oldStarRocks.getContainerId())
-                .withRepository(NEW_STARROCKS_REPOSITORY)
-                .withTag(NEW_STARROCKS_TAG).exec();
-        oldStarRocks.stop();
-    }
-
     @ClassRule
-    public static StarRocksContainer STAR_ROCKS = (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
-            .withExposedPorts(9030, 8030, 8040)
-            .withNetwork(NETWORK)
-            .withAccessToHost(true)
-            .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
-            .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
+    public static StarRocksContainer STAR_ROCKS =
+            (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
+                    .withExposedPorts(9030, 8030, 8040)
+                    .withNetwork(NETWORK)
+                    .withAccessToHost(true)
+                    .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
+                    .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
     @ClassRule
     public static final PostgreSQLContainer POSTGRES_CONTAINER = 
(PostgreSQLContainer) new PostgreSQLContainer(
             
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres"))
-            .withUsername("flinkuser")
-            .withPassword("flinkpw")
-            .withDatabaseName("test")
-            .withNetwork(NETWORK)
-            .withNetworkAliases("postgres")
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
+                    .withUsername("flinkuser")
+                    .withPassword("flinkpw")
+                    .withDatabaseName("test")
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("postgres")
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
 
     @Before
     public void setup() {
         waitUntilJobRunning(Duration.ofSeconds(30));
         initializePostgresTable();
-        initializeStarRocksTable();
+        initializeStarRocksTable(STAR_ROCKS);
     }
 
     private void initializePostgresTable() {
@@ -149,23 +120,6 @@ public class PostgresToStarRocksTest extends 
FlinkContainerTestEnv {
         }
     }
 
-    private void initializeStarRocksTable() {
-        try (Connection conn =
-                DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
-                        STAR_ROCKS.getPassword());
-                Statement stat = conn.createStatement()) {
-            stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n"
-                    + "       id INT NOT NULL,\n"
-                    + "       name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
-                    + "       description VARCHAR(512)\n"
-                    + ")\n"
-                    + "PRIMARY KEY(id)\n"
-                    + "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\" 
= \"1\");");
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @AfterClass
     public static void teardown() {
         if (POSTGRES_CONTAINER != null) {
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index c328ac951e..2426c57ae4 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -117,6 +117,7 @@ public abstract class FlinkContainerTestEnv extends 
TestLogger {
                         .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
                         .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
                         .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withExposedPorts(JOB_MANAGER_REST_PORT)
                         .withLogConsumer(new Slf4jLogConsumer(JM_LOG));
         taskManager =
                 new GenericContainer<>("flink:1.15.4-scala_2.12")
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
index 11635e5619..75ba466f1d 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
@@ -49,7 +49,7 @@ public class MySqlContainer extends JdbcDatabaseContainer {
 
     public MySqlContainer(MySqlVersion version) {
         super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
-        addExposedPort(MYSQL_PORT);
+        addFixedExposedPort(33306, 3306);
     }
 
     @Override
@@ -60,8 +60,6 @@ public class MySqlContainer extends JdbcDatabaseContainer {
     @Override
     protected void configure() {
         // HERE is the difference, copy to /etc/mysql/, if copy to 
/etc/mysql/conf.d will be wrong
-        optionallyMapResourceParameterAsVolume(
-                MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", 
"mysql-default-conf");
 
         if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
             optionallyMapResourceParameterAsVolume(
@@ -79,6 +77,7 @@ public class MySqlContainer extends JdbcDatabaseContainer {
             throw new ContainerLaunchException(
                     "Empty password can be used only with the root user");
         }
+        withCommand("--default-authentication-plugin=mysql_native_password");
         setStartupAttempts(3);
     }
 
@@ -100,7 +99,8 @@ public class MySqlContainer extends JdbcDatabaseContainer {
                 + getDatabasePort()
                 + "/"
                 + databaseName
-                + additionalUrlParams;
+                + additionalUrlParams
+                + "?useSSL=false&allowPublicKeyRetrieval=true";
     }
 
     @Override
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
index 47f0d673c8..4db7538058 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
@@ -19,14 +19,10 @@ package org.apache.inlong.sort.tests.utils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.JdbcDatabaseContainer;
 import org.testcontainers.utility.DockerImageName;
 
-import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import static java.util.stream.Collectors.joining;
 
@@ -103,7 +99,6 @@ public class StarRocksContainer extends GenericContainer {
         return getMappedPort(STAR_ROCKS_QUERY_PORT);
     }
 
-
     public String getDatabaseName() {
         return databaseName;
     }
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
new file mode 100644
index 0000000000..3c60a4b3eb
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class StarRocksManager {
+
+    // 
----------------------------------------------------------------------------------------
+    // StarRocks Variables
+    // 
----------------------------------------------------------------------------------------
+    public static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks";
+    private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks";
+    private static final String NEW_STARROCKS_TAG = "latest";
+    private static final String STAR_ROCKS_IMAGE_NAME = 
"starrocks/allin1-ubi:3.0.4";
+    public static final Logger STAR_ROCKS_LOG = 
LoggerFactory.getLogger(StarRocksContainer.class);
+    public static void buildStarRocksImage() {
+        GenericContainer oldStarRocks = new 
GenericContainer(STAR_ROCKS_IMAGE_NAME);
+        Startables.deepStart(Stream.of(oldStarRocks)).join();
+        
oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"),
+                "/data/deploy/");
+        try {
+            oldStarRocks.execInContainer("chmod", "+x", 
"/data/deploy/start_fe_be.sh");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        oldStarRocks.getDockerClient()
+                .commitCmd(oldStarRocks.getContainerId())
+                .withRepository(NEW_STARROCKS_REPOSITORY)
+                .withTag(NEW_STARROCKS_TAG).exec();
+        oldStarRocks.stop();
+    }
+
+    public static String getNewStarRocksImageName() {
+        return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG;
+    }
+
+    public static void initializeStarRocksTable(StarRocksContainer STAR_ROCKS) 
{
+        try (Connection conn =
+                DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
+                        STAR_ROCKS.getPassword());
+                Statement stat = conn.createStatement()) {
+            stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n"
+                    + "       id INT NOT NULL,\n"
+                    + "       name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+                    + "       description VARCHAR(512)\n"
+                    + ")\n"
+                    + "PRIMARY KEY(id)\n"
+                    + "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\" 
= \"1\");");
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
index 87a492c496..bbf52cd615 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
@@ -60,4 +60,5 @@ binlog_format     = row
 
 # enable gtid mode
 gtid_mode = on
-enforce_gtid_consistency = on
\ No newline at end of file
+enforce_gtid_consistency = on
+default_authentication_plugin=mysql_native_password
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
index 9ec4b48bbd..73c4be16a7 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
@@ -22,4 +22,5 @@
 --
 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
LOCK TABLES  ON *.* TO 'flinkuser'@'%';
 CREATE USER 'inlong' IDENTIFIED BY 'inlong';
-GRANT ALL PRIVILEGES ON *.* TO 'inlong'@'%';
+ALTER USER 'root'@'%' IDENTIFIED WITH 'mysql_native_password' BY 'inlong';
+FLUSH PRIVILEGES;
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
new file mode 100644
index 0000000000..9f74d54ae7
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
@@ -0,0 +1,38 @@
+CREATE TABLE test_input1 (
+    `id` INT primary key,
+    name STRING,
+    description STRING
+) WITH (
+    'connector' = 'mysql-cdc-inlong',
+    'hostname' = 'mysql',
+    'port' = '3306',
+    'username' = 'root',
+    'password' = 'inlong',
+    'database-name' = 'test',
+    'table-name' = 'test_input1',
+    'scan.incremental.snapshot.enabled' = 'false',
+    'jdbc.properties.useSSL' = 'false',
+    'jdbc.properties.allowPublicKeyRetrieval' = 'true'
+    );
+
+CREATE TABLE test_output1 (
+      `id` INT primary key,
+      name STRING,
+      description STRING
+) WITH (
+      'connector' = 'starrocks-inlong',
+      'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+      'load-url'='starrocks:8030',
+      'database-name'='test',
+      'table-name' = 'test_output1',
+      'username' = 'inlong',
+      'password' = 'inlong',
+      'sink.properties.format' = 'json',
+      'sink.properties.strip_outer_array' = 'true',
+      'sink.buffer-flush.interval-ms' = '1000'
+      );
+
+INSERT INTO test_output1 select * from test_input1;
+
+
+
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
new file mode 100644
index 0000000000..f69bdc20fc
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
@@ -0,0 +1,178 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.15</artifactId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-mysql-cdc</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <!-- Debezium dependencies -->
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-mysql-cdc</artifactId>
+            <version>${flink.connector.mysql.cdc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-cdc-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-debezium</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.kafka</groupId>
+                    <artifactId>kafka-log4j-appender</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka-clients.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mysql</artifactId>
+            <version>${debezium.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                    <include>io.debezium:debezium-api</include>
+                                    
<include>io.debezium:debezium-embedded</include>
+                                    
<include>io.debezium:debezium-core</include>
+                                    
<include>io.debezium:debezium-ddl-parser</include>
+                                    
<include>io.debezium:debezium-connector-mysql</include>
+                                    
<include>com.ververica:flink-connector-debezium</include>
+                                    
<include>com.ververica:flink-connector-mysql-cdc</include>
+                                    <include>org.antlr:antlr4-runtime</include>
+                                    <include>org.apache.kafka:*</include>
+                                    
<include>mysql:mysql-connector-java</include>
+                                    
<include>com.zendesk:mysql-binlog-connector-java</include>
+                                    <include>com.fasterxml.*:*</include>
+                                    <include>com.google.guava:*</include>
+                                    
<include>com.esri.geometry:esri-geometry-api</include>
+                                    <include>com.zaxxer:HikariCP</include>
+                                    <!--  Include fixed version 18.0-13.0 of 
flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                    <include>com.google.protobuf:*</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                                <filter>
+                                    <artifact>org.apache.kafka:*</artifact>
+                                    <excludes>
+                                        
<exclude>kafka/kafka-version.properties</exclude>
+                                        <exclude>LICENSE</exclude>
+                                        <!-- Does not contain anything 
relevant.
+                                            Cites a binary dependency on 
jersey, but this is neither reflected in the
+                                            dependency graph, nor are any 
jersey files bundled. -->
+                                        <exclude>NOTICE</exclude>
+                                        <exclude>common/**</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.cdc.base</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.cdc.base</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    
<shadedPattern>com.ververica.cdc.connectors.shaded.org.apache.kafka</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.antlr</pattern>
+                                    
<shadedPattern>com.ververica.cdc.connectors.shaded.org.antlr</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    
<shadedPattern>com.ververica.cdc.connectors.shaded.com.fasterxml</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    
<shadedPattern>com.ververica.cdc.connectors.shaded.com.google</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.esri.geometry</pattern>
+                                    
<shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.zaxxer</pattern>
+                                    
<shadedPattern>com.ververica.cdc.connectors.shaded.com.zaxxer</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
new file mode 100644
index 0000000000..c684cc3ae4
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mysql;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import com.ververica.cdc.connectors.mysql.table.MySqlTableSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
+import static 
com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils.PROPERTIES_PREFIX;
+import static 
com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils.getJdbcProperties;
+import static 
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static 
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.flink.util.Preconditions.checkState;
+import static 
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
+import static org.apache.inlong.sort.base.Constants.*;
+
+public class MysqlTableFactory implements DynamicTableSourceFactory {
+
+    private static final String IDENTIFIER = "mysql-cdc-inlong";
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validateExcept(
+                DEBEZIUM_OPTIONS_PREFIX, PROPERTIES_PREFIX);
+        final ReadableConfig config = helper.getOptions();
+        ResolvedSchema physicalSchema =
+                
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
+        final String hostname = config.get(HOSTNAME);
+        final String username = config.get(USERNAME);
+        final String password = config.get(PASSWORD);
+        final String databaseName = config.get(DATABASE_NAME);
+        validateRegex(DATABASE_NAME.key(), databaseName);
+        final String tableName = config.get(TABLE_NAME);
+        validateRegex(TABLE_NAME.key(), tableName);
+        int port = config.get(PORT);
+        int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+        int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+        ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+
+        String serverId = validateAndGetServerId(config);
+        StartupOptions startupOptions = getStartupOptions(config);
+        Duration connectTimeout = config.get(CONNECT_TIMEOUT);
+        int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+        int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+        double distributionFactorUpper = 
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        double distributionFactorLower = 
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+        Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
+        boolean enableParallelRead = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+        final String chunkKeyColumn = config.get(CHUNK_KEY_COLUMN);
+        if (enableParallelRead) {
+            validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
+            validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 
1);
+            validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
+            validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
+            validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
+            validateDistributionFactorUpper(distributionFactorUpper);
+            validateDistributionFactorLower(distributionFactorLower);
+        }
+
+        return new MySqlTableSource(physicalSchema,
+                port,
+                hostname,
+                databaseName,
+                tableName,
+                username,
+                password,
+                serverTimeZone,
+                getDebeziumProperties(context.getCatalogTable().getOptions()),
+                serverId,
+                enableParallelRead,
+                splitSize,
+                splitMetaGroupSize,
+                fetchSize,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                startupOptions,
+                scanNewlyAddedTableEnabled,
+                getJdbcProperties(context.getCatalogTable().getOptions()),
+                heartbeatInterval,
+                chunkKeyColumn);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOSTNAME);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(DATABASE_NAME);
+        options.add(TABLE_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PORT);
+        options.add(SERVER_TIME_ZONE);
+        options.add(SERVER_ID);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        options.add(CHUNK_META_GROUP_SIZE);
+        options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+        options.add(CONNECT_TIMEOUT);
+        options.add(CONNECTION_POOL_SIZE);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        options.add(CONNECT_MAX_RETRIES);
+        options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+        options.add(HEARTBEAT_INTERVAL);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
+        options.add(ROW_KINDS_FILTERED);
+        options.add(AUDIT_KEYS);
+        options.add(GH_OST_DDL_CHANGE);
+        options.add(GH_OST_TABLE_REGEX);
+        options.add(CHUNK_KEY_COLUMN);
+        return options;
+    }
+
+    public static final ConfigOption<String> CHUNK_KEY_COLUMN =
+            ConfigOptions.key("chunk-key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The chunk key column");
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the MySQL 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the MySQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the MySQL database to use when connecting 
to the MySQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the MySQL 
database server.");
+
+    public static final ConfigOption<String> DATABASE_NAME =
+            ConfigOptions.key("database-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Database name of the MySQL server to 
monitor.");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name of the MySQL database to 
monitor.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The session time zone in database 
server.");
+
+    public static final ConfigOption<String> SERVER_ID =
+            ConfigOptions.key("server-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A numeric ID or a numeric ID range of this 
database client, "
+                                    + "The numeric ID syntax is like '5400', 
the numeric ID range syntax "
+                                    + "is like '5400-5408', The numeric ID 
range syntax is recommended when "
+                                    + "'scan.incremental.snapshot.enabled' 
enabled. Every ID must be unique across all "
+                                    + "currently-running database processes in 
the MySQL cluster. This connector"
+                                    + " joins the MySQL  cluster as another 
server (with this unique ID) "
+                                    + "so it can read the binlog. By default, 
a random number is generated between"
+                                    + " 5400 and 6400, though we recommend 
setting an explicit value.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+            ConfigOptions.key("scan.incremental.snapshot.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Incremental snapshot is a new mechanism to read 
snapshot of a table. "
+                                    + "Compared to the old snapshot mechanism, 
the incremental "
+                                    + "snapshot has many advantages, 
including:\n"
+                                    + "(1) source can be parallel during 
snapshot reading, \n"
+                                    + "(2) source can perform checkpoints in 
the chunk "
+                                    + "granularity during snapshot reading, \n"
+                                    + "(3) source doesn't need to acquire 
global read lock "
+                                    + "(FLUSH TABLES WITH READ LOCK) before 
snapshot reading.\n"
+                                    + "If you would like the source run in 
parallel, each parallel "
+                                    + "reader should have an unique server id, 
"
+                                    + "so the 'server-id' must be a range like 
'5400-6400', "
+                                    + "and the range must be larger than the 
parallelism.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, "
+                                    + "captured tables are split into multiple 
"
+                                    + "chunks when read the snapshot of 
table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after "
+                                    + "trying to connect to the MySQL database 
server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build "
+                                    + "MySQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for MySQL CDC consumer, 
valid "
+                                    + "enumerations are "
+                                    + "\"initial\", \"earliest-offset\", "
+                                    + "\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" 
startup mode");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for 
tracing the "
+                                    + "latest available binlog offsets");
+
+    public static final ConfigOption<String> ROW_KINDS_FILTERED =
+            ConfigOptions.key("row-kinds-filtered")
+                    .stringType()
+                    .defaultValue("+I&-U&+U&-D")
+                    .withDescription("row kinds to be filtered,"
+                            + " here filtered means keep the data of certain 
row kind"
+                            + "the format follows rowKind1&rowKind2, supported 
row kinds are "
+                            + "\"+I\" represents INSERT.\n"
+                            + "\"-U\" represents UPDATE_BEFORE.\n"
+                            + "\"+U\" represents UPDATE_AFTER.\n"
+                            + "\"-D\" represents DELETE.");
+
+    // 
----------------------------------------------------------------------------
+    // experimental options, won't add them to documentation
+    // 
----------------------------------------------------------------------------
+    @Experimental
+    public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+            ConfigOptions.key("chunk-meta.group.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The group size of chunk meta, if the meta size 
exceeds the "
+                                    + "group size, the meta will be will be 
divided into multiple groups.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
+                    .doubleType()
+                    .defaultValue(1000.0d)
+                    .withDescription(
+                            "The upper bound of split key distribution factor. 
The distribution "
+                                    + "factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization "
+                                    + "when the data distribution is even,"
+                                    + " and the query MySQL for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - "
+                                    + "MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    .withDescription(
+                            "The lower bound of split key distribution factor. 
The distribution "
+                                    + "factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization "
+                                    + "when the data distribution is even,"
+                                    + " and the query MySQL for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - "
+                                    + "MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
+            ConfigOptions.key("scan.newly-added-table.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether capture the scan the newly added tables 
or not, by default is false.");
+
+    private void validateRegex(String optionName, String regex) {
+        try {
+            Pattern.compile(regex);
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "The %s '%s' is not a valid regular expression", 
optionName, regex),
+                    e);
+        }
+    }
+
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+    private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = 
"specific-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = 
"timestamp";
+
+    private static StartupOptions getStartupOptions(ReadableConfig config) {
+        String modeString = config.get(SCAN_STARTUP_MODE);
+
+        switch (modeString.toLowerCase()) {
+            case SCAN_STARTUP_MODE_VALUE_INITIAL:
+                return StartupOptions.initial();
+
+            case SCAN_STARTUP_MODE_VALUE_LATEST:
+                return StartupOptions.latest();
+
+            case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+                return StartupOptions.earliest();
+
+            case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
+                validateSpecificOffset(config);
+                return getSpecificOffset(config);
+
+            case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+                return 
StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
+
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Invalid value for option '%s'. Supported 
values are [%s, %s], but was: %s",
+                                SCAN_STARTUP_MODE.key(),
+                                SCAN_STARTUP_MODE_VALUE_INITIAL,
+                                SCAN_STARTUP_MODE_VALUE_LATEST,
+                                modeString));
+        }
+    }
+
+    private static void validateSpecificOffset(ReadableConfig config) {
+        Optional<String> gtidSet = config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
+        Optional<String> binlogFilename = config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+        Optional<Long> binlogPosition = config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+        if (!gtidSet.isPresent() && !(binlogFilename.isPresent() && 
binlogPosition.isPresent())) {
+            throw new ValidationException(
+                    String.format(
+                            "Unable to find a valid binlog offset. Either %s, 
or %s and %s are required.",
+                            
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(),
+                            
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(),
+                            
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key()));
+        }
+    }
+
+    private static StartupOptions getSpecificOffset(ReadableConfig config) {
+        BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+
+        // GTID set
+        config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
+                .ifPresent(offsetBuilder::setGtidSet);
+
+        // Binlog file + pos
+        Optional<String> binlogFilename = 
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+        Optional<Long> binlogPosition = 
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+        if (binlogFilename.isPresent() && binlogPosition.isPresent()) {
+            offsetBuilder.setBinlogFilePosition(binlogFilename.get(), 
binlogPosition.get());
+        } else {
+            offsetBuilder.setBinlogFilePosition("", 0);
+        }
+
+        config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+                .ifPresent(offsetBuilder::setSkipEvents);
+        config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+                .ifPresent(offsetBuilder::setSkipRows);
+        return StartupOptions.specificOffset(offsetBuilder.build());
+    }
+
+    private void validateDistributionFactorLower(double 
distributionFactorLower) {
+        checkState(
+                doubleCompare(distributionFactorLower, 0.0d) >= 0
+                        && doubleCompare(distributionFactorLower, 1.0d) <= 0,
+                String.format(
+                        "The value of option '%s' must between %s and %s 
inclusively, but is %s",
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
+                        0.0d,
+                        1.0d,
+                        distributionFactorLower));
+    }
+
+    private void validateDistributionFactorUpper(double 
distributionFactorUpper) {
+        checkState(
+                doubleCompare(distributionFactorUpper, 1.0d) >= 0,
+                String.format(
+                        "The value of option '%s' must larger than or equals 
%s, but is %s",
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
+                        1.0d,
+                        distributionFactorUpper));
+    }
+
+    private void validateIntegerOption(
+            ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
+        checkState(
+                optionValue > exclusiveMin,
+                String.format(
+                        "The value of option '%s' must larger than %d, but is 
%d",
+                        option.key(), exclusiveMin, optionValue));
+    }
+
+    private String validateAndGetServerId(ReadableConfig configuration) {
+        final String serverIdValue = 
configuration.get(MySqlSourceOptions.SERVER_ID);
+        if (serverIdValue != null) {
+            // validation
+            try {
+                ServerIdRange.from(serverIdValue);
+            } catch (Exception e) {
+                throw new ValidationException(
+                        String.format(
+                                "The value of option 'server-id' is invalid: 
'%s'", serverIdValue),
+                        e);
+            }
+        }
+        return serverIdValue;
+    }
+
+    public static final ConfigOption<String> INLONG_METRIC =
+            ConfigOptions.key("inlong.metric.labels")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("INLONG metric labels, format is 
'key1=value1&key2=value2',"
+                            + "default is 
'groupId=xxx&streamId=xxx&nodeId=xxx'");
+
+    public static final ConfigOption<String> INLONG_AUDIT =
+            ConfigOptions.key(METRICS_AUDIT_PROXY_HOSTS_KEY)
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Audit proxy host address for reporting 
audit metrics. \n"
+                            + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+
+    public static final ConfigOption<String> AUDIT_KEYS =
+            ConfigOptions.key("metrics.audit.key")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("Audit keys for metrics collecting");
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..da4dc53894
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.mysql.MysqlTableFactory
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 6756f785d0..73a81bf262 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -34,6 +34,7 @@
     <modules>
         <module>postgres-cdc</module>
         <module>starrocks</module>
+        <module>mysql-cdc</module>
         <module>iceberg</module>
     </modules>
 
diff --git 
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
 
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
index 456b556002..5c3bab289a 100644
--- 
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
+++ 
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 import java.util.Map;
+
 import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
 import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
 import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;

Reply via email to