This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a3b3fc7f70 [INLONG-8967][Sort] Add Mysql connector on flink 1.15 (#8980) a3b3fc7f70 is described below commit a3b3fc7f70cb1e78335888daede75ebb44d62ca4 Author: EpicMo <1982742...@qq.com> AuthorDate: Sat Oct 7 09:50:50 2023 +0800 [INLONG-8967][Sort] Add Mysql connector on flink 1.15 (#8980) --- .../src/main/assemblies/sort-connectors-v1.15.xml | 8 + inlong-sort/sort-core/pom.xml | 6 + .../sort-end-to-end-tests-v1.15/pom.xml | 8 + .../apache/inlong/sort/tests/MysqlToRocksTest.java | 170 +++++++ .../inlong/sort/tests/PostgresToStarRocksTest.java | 88 +--- .../sort/tests/utils/FlinkContainerTestEnv.java | 1 + .../inlong/sort/tests/utils/MySqlContainer.java | 8 +- .../sort/tests/utils/StarRocksContainer.java | 5 - .../inlong/sort/tests/utils/StarRocksManager.java | 79 +++ .../src/test/resources/docker/mysql/my.cnf | 3 +- .../src/test/resources/docker/mysql/setup.sql | 3 +- .../src/test/resources/flinkSql/mysql_test.sql | 38 ++ .../sort-connectors/mysql-cdc/pom.xml | 178 +++++++ .../inlong/sort/mysql/MysqlTableFactory.java | 537 +++++++++++++++++++++ .../org.apache.flink.table.factories.Factory | 16 + .../sort-flink-v1.15/sort-connectors/pom.xml | 1 + .../sort/formats/json/utils/FormatJsonUtil.java | 1 + 17 files changed, 1072 insertions(+), 78 deletions(-) diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml index f61fbc0e9e..184e727a91 100644 --- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml +++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml @@ -51,5 +51,13 @@ </includes> <fileMode>0644</fileMode> </fileSet> + <fileSet> + <directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/target</directory> + <outputDirectory>inlong-sort/connectors</outputDirectory> + <includes> + <include>sort-connector-mysql-v1.15-${project.version}.jar</include> + </includes> + <fileMode>0644</fileMode> + </fileSet> </fileSets> </assembly> diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index 0d2626b748..b18604b67d 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -257,6 +257,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-mysql-cdc-v1.15</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index 00cc1803ce..6841331770 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -165,6 +165,14 @@ <type>jar</type> <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-mysql-cdc-v1.15</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-mysql-cdc.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> <artifactItem> <groupId>org.apache.inlong</groupId> <artifactId>sort-connector-starrocks-v1.15</artifactId> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java new file mode 100644 index 0000000000..e02501cfd1 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests; + +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.JdbcProxy; +import org.apache.inlong.sort.tests.utils.MySqlContainer; +import org.apache.inlong.sort.tests.utils.StarRocksContainer; +import org.apache.inlong.sort.tests.utils.StarRocksManager; +import org.apache.inlong.sort.tests.utils.TestUtils; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; + +/** + * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. + * Test flink sql Mysql cdc to StarRocks + */ +public class MysqlToRocksTest extends FlinkContainerTestEnv { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlToRocksTest.class); + + private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar"); + private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); + private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); + private static final String sqlFile; + + static { + try { + sqlFile = + Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString(); + StarRocksManager.buildStarRocksImage(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @ClassRule + public static StarRocksContainer STAR_ROCKS = + (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) + .withExposedPorts(9030, 8030, 8040) + .withNetwork(NETWORK) + .withAccessToHost(true) + .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); + + @ClassRule + public static final MySqlContainer MYSQL_CONTAINER = + (MySqlContainer) new MySqlContainer(MySqlContainer.MySqlVersion.V8_0) + .withDatabaseName("test") + .withNetwork(NETWORK) + .withNetworkAliases("mysql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @Before + public void setup() { + waitUntilJobRunning(Duration.ofSeconds(30)); + initializeMysqlTable(); + initializeStarRocksTable(STAR_ROCKS); + } + + private void initializeMysqlTable() { + try { + Class.forName(MYSQL_CONTAINER.getDriverClassName()); + Connection conn = DriverManager + .getConnection(MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement stat = conn.createStatement(); + stat.execute( + "CREATE TABLE test_input1 (\n" + + " id SERIAL,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + + " description VARCHAR(512),\n" + + " PRIMARY KEY(id)\n" + + ");"); + stat.close(); + conn.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void teardown() { + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.stop(); + } + if (STAR_ROCKS != null) { + STAR_ROCKS.stop(); + } + } + + /** + * Test flink sql postgresql cdc to StarRocks + * + * @throws Exception The exception may throws when execute the case + */ + @Test + public void testMysqlUpdateAndDelete() throws Exception { + submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar); + waitUntilJobRunning(Duration.ofSeconds(10)); + + // generate input + try (Connection conn = + DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement stat = conn.createStatement()) { + stat.execute( + "INSERT INTO test_input1 " + + "VALUES (1,'jacket','water resistent white wind breaker');"); + stat.execute( + "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');"); + stat.execute( + "update test_input1 set name = 'tom' where id = 2;"); + stat.execute( + "delete from test_input1 where id = 1;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + JdbcProxy proxy = + new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), + STAR_ROCKS.getPassword(), + STAR_ROCKS.getDriverClassName()); + List<String> expectResult = + Arrays.asList("2,tom,Big 2-wheel scooter "); + proxy.checkResultWithTimeout( + expectResult, + "test_output1", + 3, + 60000L); + } +} \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java index 499cc4b7f2..5c7208a7f6 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java @@ -28,12 +28,9 @@ import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; import java.net.URISyntaxException; import java.nio.file.Path; @@ -45,8 +42,12 @@ import java.sql.Statement; import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.stream.Stream; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; /** * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. * Test flink sql Postgres cdc to StarRocks @@ -58,72 +59,42 @@ public class PostgresToStarRocksTest extends FlinkContainerTestEnv { private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar"); private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); - - private static final Logger STAR_ROCKS_LOG = LoggerFactory.getLogger(StarRocksContainer.class); - private static final String sqlFile; - // ---------------------------------------------------------------------------------------- - // StarRocks Variables - // ---------------------------------------------------------------------------------------- - private static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks"; - private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks"; - private static final String NEW_STARROCKS_TAG = "latest"; - private static final String STAR_ROCKS_IMAGE_NAME = "starrocks/allin1-ubi:3.0.4"; - static { try { - sqlFile = Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI()).toString(); + sqlFile = Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI()) + .toString(); buildStarRocksImage(); } catch (URISyntaxException e) { throw new RuntimeException(e); } } - private static String getNewStarRocksImageName() { - return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG; - } - - public static void buildStarRocksImage() { - GenericContainer oldStarRocks = new GenericContainer(STAR_ROCKS_IMAGE_NAME); - Startables.deepStart(Stream.of(oldStarRocks)).join(); - oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"), - "/data/deploy/"); - try { - oldStarRocks.execInContainer("chmod", "+x", "/data/deploy/start_fe_be.sh"); - } catch (Exception e) { - e.printStackTrace(); - } - oldStarRocks.getDockerClient() - .commitCmd(oldStarRocks.getContainerId()) - .withRepository(NEW_STARROCKS_REPOSITORY) - .withTag(NEW_STARROCKS_TAG).exec(); - oldStarRocks.stop(); - } - @ClassRule - public static StarRocksContainer STAR_ROCKS = (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) - .withExposedPorts(9030, 8030, 8040) - .withNetwork(NETWORK) - .withAccessToHost(true) - .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) - .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); + public static StarRocksContainer STAR_ROCKS = + (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) + .withExposedPorts(9030, 8030, 8040) + .withNetwork(NETWORK) + .withAccessToHost(true) + .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); @ClassRule public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer( DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres")) - .withUsername("flinkuser") - .withPassword("flinkpw") - .withDatabaseName("test") - .withNetwork(NETWORK) - .withNetworkAliases("postgres") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .withUsername("flinkuser") + .withPassword("flinkpw") + .withDatabaseName("test") + .withNetwork(NETWORK) + .withNetworkAliases("postgres") + .withLogConsumer(new Slf4jLogConsumer(LOG)); @Before public void setup() { waitUntilJobRunning(Duration.ofSeconds(30)); initializePostgresTable(); - initializeStarRocksTable(); + initializeStarRocksTable(STAR_ROCKS); } private void initializePostgresTable() { @@ -149,23 +120,6 @@ public class PostgresToStarRocksTest extends FlinkContainerTestEnv { } } - private void initializeStarRocksTable() { - try (Connection conn = - DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), - STAR_ROCKS.getPassword()); - Statement stat = conn.createStatement()) { - stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n" - + " id INT NOT NULL,\n" - + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" - + " description VARCHAR(512)\n" - + ")\n" - + "PRIMARY KEY(id)\n" - + "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\" = \"1\");"); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - @AfterClass public static void teardown() { if (POSTGRES_CONTAINER != null) { diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java index c328ac951e..2426c57ae4 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java @@ -117,6 +117,7 @@ public abstract class FlinkContainerTestEnv extends TestLogger { .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT) .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withExposedPorts(JOB_MANAGER_REST_PORT) .withLogConsumer(new Slf4jLogConsumer(JM_LOG)); taskManager = new GenericContainer<>("flink:1.15.4-scala_2.12") diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java index 11635e5619..75ba466f1d 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java @@ -49,7 +49,7 @@ public class MySqlContainer extends JdbcDatabaseContainer { public MySqlContainer(MySqlVersion version) { super(DockerImageName.parse(IMAGE + ":" + version.getVersion())); - addExposedPort(MYSQL_PORT); + addFixedExposedPort(33306, 3306); } @Override @@ -60,8 +60,6 @@ public class MySqlContainer extends JdbcDatabaseContainer { @Override protected void configure() { // HERE is the difference, copy to /etc/mysql/, if copy to /etc/mysql/conf.d will be wrong - optionallyMapResourceParameterAsVolume( - MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf"); if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) { optionallyMapResourceParameterAsVolume( @@ -79,6 +77,7 @@ public class MySqlContainer extends JdbcDatabaseContainer { throw new ContainerLaunchException( "Empty password can be used only with the root user"); } + withCommand("--default-authentication-plugin=mysql_native_password"); setStartupAttempts(3); } @@ -100,7 +99,8 @@ public class MySqlContainer extends JdbcDatabaseContainer { + getDatabasePort() + "/" + databaseName - + additionalUrlParams; + + additionalUrlParams + + "?useSSL=false&allowPublicKeyRetrieval=true"; } @Override diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java index 47f0d673c8..4db7538058 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java @@ -19,14 +19,10 @@ package org.apache.inlong.sort.tests.utils; import org.apache.commons.lang3.StringUtils; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.utility.DockerImageName; -import java.io.IOException; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import static java.util.stream.Collectors.joining; @@ -103,7 +99,6 @@ public class StarRocksContainer extends GenericContainer { return getMappedPort(STAR_ROCKS_QUERY_PORT); } - public String getDatabaseName() { return databaseName; } diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java new file mode 100644 index 0000000000..3c60a4b3eb --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.stream.Stream; + +public class StarRocksManager { + + // ---------------------------------------------------------------------------------------- + // StarRocks Variables + // ---------------------------------------------------------------------------------------- + public static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks"; + private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks"; + private static final String NEW_STARROCKS_TAG = "latest"; + private static final String STAR_ROCKS_IMAGE_NAME = "starrocks/allin1-ubi:3.0.4"; + public static final Logger STAR_ROCKS_LOG = LoggerFactory.getLogger(StarRocksContainer.class); + public static void buildStarRocksImage() { + GenericContainer oldStarRocks = new GenericContainer(STAR_ROCKS_IMAGE_NAME); + Startables.deepStart(Stream.of(oldStarRocks)).join(); + oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"), + "/data/deploy/"); + try { + oldStarRocks.execInContainer("chmod", "+x", "/data/deploy/start_fe_be.sh"); + } catch (Exception e) { + e.printStackTrace(); + } + oldStarRocks.getDockerClient() + .commitCmd(oldStarRocks.getContainerId()) + .withRepository(NEW_STARROCKS_REPOSITORY) + .withTag(NEW_STARROCKS_TAG).exec(); + oldStarRocks.stop(); + } + + public static String getNewStarRocksImageName() { + return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG; + } + + public static void initializeStarRocksTable(StarRocksContainer STAR_ROCKS) { + try (Connection conn = + DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), + STAR_ROCKS.getPassword()); + Statement stat = conn.createStatement()) { + stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n" + + " id INT NOT NULL,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + + " description VARCHAR(512)\n" + + ")\n" + + "PRIMARY KEY(id)\n" + + "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\" = \"1\");"); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf index 87a492c496..bbf52cd615 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf @@ -60,4 +60,5 @@ binlog_format = row # enable gtid mode gtid_mode = on -enforce_gtid_consistency = on \ No newline at end of file +enforce_gtid_consistency = on +default_authentication_plugin=mysql_native_password \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql index 9ec4b48bbd..73c4be16a7 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql @@ -22,4 +22,5 @@ -- GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%'; CREATE USER 'inlong' IDENTIFIED BY 'inlong'; -GRANT ALL PRIVILEGES ON *.* TO 'inlong'@'%'; +ALTER USER 'root'@'%' IDENTIFIED WITH 'mysql_native_password' BY 'inlong'; +FLUSH PRIVILEGES; \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql new file mode 100644 index 0000000000..9f74d54ae7 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql @@ -0,0 +1,38 @@ +CREATE TABLE test_input1 ( + `id` INT primary key, + name STRING, + description STRING +) WITH ( + 'connector' = 'mysql-cdc-inlong', + 'hostname' = 'mysql', + 'port' = '3306', + 'username' = 'root', + 'password' = 'inlong', + 'database-name' = 'test', + 'table-name' = 'test_input1', + 'scan.incremental.snapshot.enabled' = 'false', + 'jdbc.properties.useSSL' = 'false', + 'jdbc.properties.allowPublicKeyRetrieval' = 'true' + ); + +CREATE TABLE test_output1 ( + `id` INT primary key, + name STRING, + description STRING +) WITH ( + 'connector' = 'starrocks-inlong', + 'jdbc-url' = 'jdbc:mysql://starrocks:9030', + 'load-url'='starrocks:8030', + 'database-name'='test', + 'table-name' = 'test_output1', + 'username' = 'inlong', + 'password' = 'inlong', + 'sink.properties.format' = 'json', + 'sink.properties.strip_outer_array' = 'true', + 'sink.buffer-flush.interval-ms' = '1000' + ); + +INSERT INTO test_output1 select * from test_input1; + + + diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml new file mode 100644 index 0000000000..f69bdc20fc --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml @@ -0,0 +1,178 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connectors-v1.15</artifactId> + <version>1.10.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-connector-mysql-cdc-v1.15</artifactId> + <packaging>jar</packaging> + <name>Apache InLong - Sort-connector-mysql-cdc</name> + + <properties> + <inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + <!-- Debezium dependencies --> + <dependency> + <groupId>com.ververica</groupId> + <artifactId>flink-connector-mysql-cdc</artifactId> + <version>${flink.connector.mysql.cdc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-cdc-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.ververica</groupId> + <artifactId>flink-connector-debezium</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-log4j-appender</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka-clients.version}</version> + </dependency> + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-mysql</artifactId> + <version>${debezium.version}</version> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + <configuration> + <artifactSet> + <includes> + <include>org.apache.inlong:*</include> + <include>io.debezium:debezium-api</include> + <include>io.debezium:debezium-embedded</include> + <include>io.debezium:debezium-core</include> + <include>io.debezium:debezium-ddl-parser</include> + <include>io.debezium:debezium-connector-mysql</include> + <include>com.ververica:flink-connector-debezium</include> + <include>com.ververica:flink-connector-mysql-cdc</include> + <include>org.antlr:antlr4-runtime</include> + <include>org.apache.kafka:*</include> + <include>mysql:mysql-connector-java</include> + <include>com.zendesk:mysql-binlog-connector-java</include> + <include>com.fasterxml.*:*</include> + <include>com.google.guava:*</include> + <include>com.esri.geometry:esri-geometry-api</include> + <include>com.zaxxer:HikariCP</include> + <!-- Include fixed version 18.0-13.0 of flink shaded guava --> + <include>org.apache.flink:flink-shaded-guava</include> + <include>com.google.protobuf:*</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>org.apache.inlong:sort-connector-*</artifact> + <includes> + <include>org/apache/inlong/**</include> + <include>META-INF/services/org.apache.flink.table.factories.Factory</include> + </includes> + </filter> + <filter> + <artifact>org.apache.kafka:*</artifact> + <excludes> + <exclude>kafka/kafka-version.properties</exclude> + <exclude>LICENSE</exclude> + <!-- Does not contain anything relevant. + Cites a binary dependency on jersey, but this is neither reflected in the + dependency graph, nor are any jersey files bundled. --> + <exclude>NOTICE</exclude> + <exclude>common/**</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>org.apache.inlong.sort.base</pattern> + <shadedPattern>org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.inlong.sort.cdc.base</pattern> + <shadedPattern>org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.cdc.base</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.kafka</pattern> + <shadedPattern>com.ververica.cdc.connectors.shaded.org.apache.kafka</shadedPattern> + </relocation> + <relocation> + <pattern>org.antlr</pattern> + <shadedPattern>com.ververica.cdc.connectors.shaded.org.antlr</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml</pattern> + <shadedPattern>com.ververica.cdc.connectors.shaded.com.fasterxml</shadedPattern> + </relocation> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>com.ververica.cdc.connectors.shaded.com.google</shadedPattern> + </relocation> + <relocation> + <pattern>com.esri.geometry</pattern> + <shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern> + </relocation> + <relocation> + <pattern>com.zaxxer</pattern> + <shadedPattern>com.ververica.cdc.connectors.shaded.com.zaxxer</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java new file mode 100644 index 0000000000..c684cc3ae4 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mysql; + +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; +import com.ververica.cdc.connectors.mysql.table.MySqlTableSource; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Pattern; + +import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; +import static com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils.PROPERTIES_PREFIX; +import static com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils.getJdbcProperties; +import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; +import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; +import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; +import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY; +import static org.apache.inlong.sort.base.Constants.*; + +public class MysqlTableFactory implements DynamicTableSourceFactory { + + private static final String IDENTIFIER = "mysql-cdc-inlong"; + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + helper.validateExcept( + DEBEZIUM_OPTIONS_PREFIX, PROPERTIES_PREFIX); + final ReadableConfig config = helper.getOptions(); + ResolvedSchema physicalSchema = + getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); + final String hostname = config.get(HOSTNAME); + final String username = config.get(USERNAME); + final String password = config.get(PASSWORD); + final String databaseName = config.get(DATABASE_NAME); + validateRegex(DATABASE_NAME.key(), databaseName); + final String tableName = config.get(TABLE_NAME); + validateRegex(TABLE_NAME.key(), tableName); + int port = config.get(PORT); + int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE); + int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); + ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); + + String serverId = validateAndGetServerId(config); + StartupOptions startupOptions = getStartupOptions(config); + Duration connectTimeout = config.get(CONNECT_TIMEOUT); + int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); + int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); + double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); + Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); + boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + final String chunkKeyColumn = config.get(CHUNK_KEY_COLUMN); + if (enableParallelRead) { + validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); + validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); + validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); + validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1); + validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0); + validateDistributionFactorUpper(distributionFactorUpper); + validateDistributionFactorLower(distributionFactorLower); + } + + return new MySqlTableSource(physicalSchema, + port, + hostname, + databaseName, + tableName, + username, + password, + serverTimeZone, + getDebeziumProperties(context.getCatalogTable().getOptions()), + serverId, + enableParallelRead, + splitSize, + splitMetaGroupSize, + fetchSize, + connectTimeout, + connectMaxRetries, + connectionPoolSize, + distributionFactorUpper, + distributionFactorLower, + startupOptions, + scanNewlyAddedTableEnabled, + getJdbcProperties(context.getCatalogTable().getOptions()), + heartbeatInterval, + chunkKeyColumn); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(HOSTNAME); + options.add(USERNAME); + options.add(PASSWORD); + options.add(DATABASE_NAME); + options.add(TABLE_NAME); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(PORT); + options.add(SERVER_TIME_ZONE); + options.add(SERVER_ID); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); + options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + options.add(CHUNK_META_GROUP_SIZE); + options.add(SCAN_SNAPSHOT_FETCH_SIZE); + options.add(CONNECT_TIMEOUT); + options.add(CONNECTION_POOL_SIZE); + options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + options.add(CONNECT_MAX_RETRIES); + options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); + options.add(HEARTBEAT_INTERVAL); + options.add(INLONG_METRIC); + options.add(INLONG_AUDIT); + options.add(ROW_KINDS_FILTERED); + options.add(AUDIT_KEYS); + options.add(GH_OST_DDL_CHANGE); + options.add(GH_OST_TABLE_REGEX); + options.add(CHUNK_KEY_COLUMN); + return options; + } + + public static final ConfigOption<String> CHUNK_KEY_COLUMN = + ConfigOptions.key("chunk-key-column") + .stringType() + .noDefaultValue() + .withDescription("The chunk key column"); + + public static final ConfigOption<String> HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the MySQL database server."); + + public static final ConfigOption<Integer> PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(3306) + .withDescription("Integer port number of the MySQL database server."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the MySQL database to use when connecting to the MySQL database server."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the MySQL database server."); + + public static final ConfigOption<String> DATABASE_NAME = + ConfigOptions.key("database-name") + .stringType() + .noDefaultValue() + .withDescription("Database name of the MySQL server to monitor."); + + public static final ConfigOption<String> TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("Table name of the MySQL database to monitor."); + + public static final ConfigOption<String> SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .defaultValue("UTC") + .withDescription("The session time zone in database server."); + + public static final ConfigOption<String> SERVER_ID = + ConfigOptions.key("server-id") + .stringType() + .noDefaultValue() + .withDescription( + "A numeric ID or a numeric ID range of this database client, " + + "The numeric ID syntax is like '5400', the numeric ID range syntax " + + "is like '5400-5408', The numeric ID range syntax is recommended when " + + "'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all " + + "currently-running database processes in the MySQL cluster. This connector" + + " joins the MySQL cluster as another server (with this unique ID) " + + "so it can read the binlog. By default, a random number is generated between" + + " 5400 and 6400, though we recommend setting an explicit value."); + + public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ENABLED = + ConfigOptions.key("scan.incremental.snapshot.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Incremental snapshot is a new mechanism to read snapshot of a table. " + + "Compared to the old snapshot mechanism, the incremental " + + "snapshot has many advantages, including:\n" + + "(1) source can be parallel during snapshot reading, \n" + + "(2) source can perform checkpoints in the chunk " + + "granularity during snapshot reading, \n" + + "(3) source doesn't need to acquire global read lock " + + "(FLUSH TABLES WITH READ LOCK) before snapshot reading.\n" + + "If you would like the source run in parallel, each parallel " + + "reader should have an unique server id, " + + "so the 'server-id' must be a range like '5400-6400', " + + "and the range must be larger than the parallelism."); + + public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, " + + "captured tables are split into multiple " + + "chunks when read the snapshot of table."); + + public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption<Duration> CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after " + + "trying to connect to the MySQL database server before timing out."); + + public static final ConfigOption<Integer> CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption<Integer> CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build " + + "MySQL database server connection."); + + public static final ConfigOption<String> SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for MySQL CDC consumer, valid " + + "enumerations are " + + "\"initial\", \"earliest-offset\", " + + "\"latest-offset\", \"timestamp\"\n" + + "or \"specific-offset\""); + + public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = + ConfigOptions.key("scan.startup.timestamp-millis") + .longType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + + public static final ConfigOption<Duration> HEARTBEAT_INTERVAL = + ConfigOptions.key("heartbeat.interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Optional interval of sending heartbeat event for tracing the " + + "latest available binlog offsets"); + + public static final ConfigOption<String> ROW_KINDS_FILTERED = + ConfigOptions.key("row-kinds-filtered") + .stringType() + .defaultValue("+I&-U&+U&-D") + .withDescription("row kinds to be filtered," + + " here filtered means keep the data of certain row kind" + + "the format follows rowKind1&rowKind2, supported row kinds are " + + "\"+I\" represents INSERT.\n" + + "\"-U\" represents UPDATE_BEFORE.\n" + + "\"+U\" represents UPDATE_AFTER.\n" + + "\"-D\" represents DELETE."); + + // ---------------------------------------------------------------------------- + // experimental options, won't add them to documentation + // ---------------------------------------------------------------------------- + @Experimental + public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE = + ConfigOptions.key("chunk-meta.group.size") + .intType() + .defaultValue(1000) + .withDescription( + "The group size of chunk meta, if the meta size exceeds the " + + "group size, the meta will be will be divided into multiple groups."); + + @Experimental + public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("split-key.even-distribution.factor.upper-bound") + .doubleType() + .defaultValue(1000.0d) + .withDescription( + "The upper bound of split key distribution factor. The distribution " + + "factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization " + + "when the data distribution is even," + + " and the query MySQL for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - " + + "MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("split-key.even-distribution.factor.lower-bound") + .doubleType() + .defaultValue(0.05d) + .withDescription( + "The lower bound of split key distribution factor. The distribution " + + "factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization " + + "when the data distribution is even," + + " and the query MySQL for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - " + + "MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.newly-added-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether capture the scan the newly added tables or not, by default is false."); + + private void validateRegex(String optionName, String regex) { + try { + Pattern.compile(regex); + } catch (Exception e) { + throw new ValidationException( + String.format( + "The %s '%s' is not a valid regular expression", optionName, regex), + e); + } + } + + private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; + private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; + private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; + + private static StartupOptions getStartupOptions(ReadableConfig config) { + String modeString = config.get(SCAN_STARTUP_MODE); + + switch (modeString.toLowerCase()) { + case SCAN_STARTUP_MODE_VALUE_INITIAL: + return StartupOptions.initial(); + + case SCAN_STARTUP_MODE_VALUE_LATEST: + return StartupOptions.latest(); + + case SCAN_STARTUP_MODE_VALUE_EARLIEST: + return StartupOptions.earliest(); + + case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET: + validateSpecificOffset(config); + return getSpecificOffset(config); + + case SCAN_STARTUP_MODE_VALUE_TIMESTAMP: + return StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS)); + + default: + throw new ValidationException( + String.format( + "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s", + SCAN_STARTUP_MODE.key(), + SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_LATEST, + modeString)); + } + } + + private static void validateSpecificOffset(ReadableConfig config) { + Optional<String> gtidSet = config.getOptional( + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET); + Optional<String> binlogFilename = config.getOptional( + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + Optional<Long> binlogPosition = config.getOptional( + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS); + if (!gtidSet.isPresent() && !(binlogFilename.isPresent() && binlogPosition.isPresent())) { + throw new ValidationException( + String.format( + "Unable to find a valid binlog offset. Either %s, or %s and %s are required.", + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(), + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(), + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key())); + } + } + + private static StartupOptions getSpecificOffset(ReadableConfig config) { + BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder(); + + // GTID set + config.getOptional( + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET) + .ifPresent(offsetBuilder::setGtidSet); + + // Binlog file + pos + Optional<String> binlogFilename = config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + Optional<Long> binlogPosition = config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS); + if (binlogFilename.isPresent() && binlogPosition.isPresent()) { + offsetBuilder.setBinlogFilePosition(binlogFilename.get(), binlogPosition.get()); + } else { + offsetBuilder.setBinlogFilePosition("", 0); + } + + config.getOptional( + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS) + .ifPresent(offsetBuilder::setSkipEvents); + config.getOptional( + MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS) + .ifPresent(offsetBuilder::setSkipRows); + return StartupOptions.specificOffset(offsetBuilder.build()); + } + + private void validateDistributionFactorLower(double distributionFactorLower) { + checkState( + doubleCompare(distributionFactorLower, 0.0d) >= 0 + && doubleCompare(distributionFactorLower, 1.0d) <= 0, + String.format( + "The value of option '%s' must between %s and %s inclusively, but is %s", + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), + 0.0d, + 1.0d, + distributionFactorLower)); + } + + private void validateDistributionFactorUpper(double distributionFactorUpper) { + checkState( + doubleCompare(distributionFactorUpper, 1.0d) >= 0, + String.format( + "The value of option '%s' must larger than or equals %s, but is %s", + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), + 1.0d, + distributionFactorUpper)); + } + + private void validateIntegerOption( + ConfigOption<Integer> option, int optionValue, int exclusiveMin) { + checkState( + optionValue > exclusiveMin, + String.format( + "The value of option '%s' must larger than %d, but is %d", + option.key(), exclusiveMin, optionValue)); + } + + private String validateAndGetServerId(ReadableConfig configuration) { + final String serverIdValue = configuration.get(MySqlSourceOptions.SERVER_ID); + if (serverIdValue != null) { + // validation + try { + ServerIdRange.from(serverIdValue); + } catch (Exception e) { + throw new ValidationException( + String.format( + "The value of option 'server-id' is invalid: '%s'", serverIdValue), + e); + } + } + return serverIdValue; + } + + public static final ConfigOption<String> INLONG_METRIC = + ConfigOptions.key("inlong.metric.labels") + .stringType() + .noDefaultValue() + .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2'," + + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'"); + + public static final ConfigOption<String> INLONG_AUDIT = + ConfigOptions.key(METRICS_AUDIT_PROXY_HOSTS_KEY) + .stringType() + .noDefaultValue() + .withDescription("Audit proxy host address for reporting audit metrics. \n" + + "e.g. 127.0.0.1:10081,0.0.0.1:10081"); + + public static final ConfigOption<String> AUDIT_KEYS = + ConfigOptions.key("metrics.audit.key") + .stringType() + .defaultValue("") + .withDescription("Audit keys for metrics collecting"); + +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..da4dc53894 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.mysql.MysqlTableFactory diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml index 6756f785d0..73a81bf262 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml @@ -34,6 +34,7 @@ <modules> <module>postgres-cdc</module> <module>starrocks</module> + <module>mysql-cdc</module> <module>iceberg</module> </modules> diff --git a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java index 456b556002..5c3bab289a 100644 --- a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java +++ b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java @@ -42,6 +42,7 @@ import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; import java.util.Map; + import static org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION; import static org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE; import static org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;