This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a300d8dac7 [INLONG-8445][Sort] Support running tests on both Flink 1.13 and Flink 1.15 (#8475) a300d8dac7 is described below commit a300d8dac7294b4a088e794e1f049444a7a467f7 Author: haibo.duan <dhaibo1...@live.cn> AuthorDate: Tue Aug 1 12:55:49 2023 +0800 [INLONG-8445][Sort] Support running tests on both Flink 1.13 and Flink 1.15 (#8475) --- .github/workflows/ci_ut_flink15.yml | 4 +- inlong-sort/pom.xml | 4 +- inlong-sort/sort-dist/pom.xml | 3 + inlong-sort/sort-end-to-end-tests/pom.xml | 179 +---------------- .../sort-end-to-end-tests-v1.13/pom.xml | 220 +++++++++++++++++++++ .../apache/inlong/sort/tests/ClickHouseCase.java | 0 .../org/apache/inlong/sort/tests/KafkaE2ECase.java | 0 .../sort/tests/utils/FlinkContainerTestEnv.java | 0 .../apache/inlong/sort/tests/utils/JdbcProxy.java | 0 .../inlong/sort/tests/utils/MySqlContainer.java | 0 .../sort/tests/utils/PlaceholderResolver.java | 0 .../apache/inlong/sort/tests/utils/TestUtils.java | 0 .../src/test/resources/docker/mysql/my.cnf | 0 .../src/test/resources/docker/mysql/setup.sql | 0 .../test/resources/env/kafka_test_kafka_init.txt | 0 .../test/resources/env/kafka_test_mysql_init.txt | 0 .../test/resources/flinkSql/clickhouse_test.sql | 0 .../src/test/resources/flinkSql/kafka_test.sql | 0 .../src/test/resources/groupFile/kafka_test.json | 0 .../src/test/resources/log4j2-test.properties | 0 .../sort-end-to-end-tests-v1.15/pom.xml | 219 ++++++++++++++++++++ .../inlong/sort/tests/PostgresToStarRocksTest.java | 220 +++++++++++++++++++++ .../sort/tests/utils/FlinkContainerTestEnv.java | 44 +---- .../apache/inlong/sort/tests/utils/JdbcProxy.java | 0 .../inlong/sort/tests/utils/MySqlContainer.java | 0 .../sort/tests/utils/PlaceholderResolver.java | 0 .../sort/tests/utils/StarRocksContainer.java | 145 ++++++++++++++ .../apache/inlong/sort/tests/utils/TestUtils.java | 0 .../src/test/resources/docker/mysql/my.cnf | 0 .../src/test/resources/docker/mysql/setup.sql | 0 .../test/resources/docker/postgresql}/setup.sql | 9 +- .../test/resources/docker/starrocks/start_fe_be.sh | 72 +++++++ .../src/test/resources/flinkSql/postgres_test.sql | 39 ++++ .../src/test/resources/log4j2-test.properties | 28 +-- 34 files changed, 956 insertions(+), 230 deletions(-) diff --git a/.github/workflows/ci_ut_flink15.yml b/.github/workflows/ci_ut_flink15.yml index abaf605256..343af7dc39 100644 --- a/.github/workflows/ci_ut_flink15.yml +++ b/.github/workflows/ci_ut_flink15.yml @@ -55,12 +55,12 @@ jobs: restore-keys: ${{ runner.os }}-inlong-flink15 - name: Build for Flink 1.15 with Maven - run: mvn --update-snapshots -e -V package -U -pl inlong-sort/sort-core -am -Pv1.15 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000 + run: mvn --update-snapshots -e -V clean install -U -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000 env: CI: false - name: Unit test for Flink 1.15 with Maven - run: mvn --update-snapshots -e -V clean test -am -pl inlong-sort/sort-core -Pv1.15 + run: mvn --update-snapshots -e -V test -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15 env: CI: false diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml index 17daa583d7..86efbb46bd 100644 --- a/inlong-sort/pom.xml +++ b/inlong-sort/pom.xml @@ -37,6 +37,7 @@ <module>sort-core</module> <module>sort-dist</module> <module>sort-flink</module> + <module>sort-end-to-end-tests</module> </modules> <properties> @@ -166,9 +167,6 @@ <activeByDefault>true</activeByDefault> </activation> <!-- End-To-End after refactoring, you need to mention this module under modules. --> - <modules> - <module>sort-end-to-end-tests</module> - </modules> <properties> <sort.flink.version>v1.13</sort.flink.version> <flink.version>1.13.5</flink.version> diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml index 0733788826..aa1a0d5278 100644 --- a/inlong-sort/sort-dist/pom.xml +++ b/inlong-sort/sort-dist/pom.xml @@ -145,14 +145,17 @@ <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> + <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> + <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-avro</artifactId> + <version>${flink.version}</version> </dependency> </dependencies> </profile> diff --git a/inlong-sort/sort-end-to-end-tests/pom.xml b/inlong-sort/sort-end-to-end-tests/pom.xml index 73fa404489..f71d610cba 100644 --- a/inlong-sort/sort-end-to-end-tests/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/pom.xml @@ -26,187 +26,28 @@ </parent> <artifactId>sort-end-to-end-tests</artifactId> + <packaging>pom</packaging> <name>Apache InLong - Sort End to End Tests</name> <properties> <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> </properties> - <dependencies> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>testcontainers</artifactId> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>jdbc</artifactId> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>mysql</artifactId> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>kafka</artifactId> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>clickhouse</artifactId> - <version>${testcontainers.version}</version> - </dependency> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-dist</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>${flink.test.utils.artifactId}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-slf4j-impl</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-flink-dependencies-${sort.flink.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>${flink.runtime.artifactId}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>ru.yandex.clickhouse</groupId> - <artifactId>clickhouse-jdbc</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - <profiles> <profile> <id>v1.13</id> <activation> <activeByDefault>true</activeByDefault> </activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <configuration> - <artifactItems> - <artifactItem> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-dist</artifactId> - <version>${project.version}</version> - <destFileName>sort-dist.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies</outputDirectory> - </artifactItem> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>${mysql.jdbc.version}</version> - <destFileName>mysql-driver.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies</outputDirectory> - </dependency> - <artifactItem> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-connector-kafka</artifactId> - <version>${project.version}</version> - <destFileName>sort-connector-kafka.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies</outputDirectory> - </artifactItem> - <artifactItem> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-connector-mysql-cdc</artifactId> - <version>${project.version}</version> - <destFileName>sort-connector-mysql-cdc.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies</outputDirectory> - </artifactItem> - <artifactItem> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-connector-jdbc</artifactId> - <version>${project.version}</version> - <destFileName>sort-connector-jdbc.jar</destFileName> - <type>jar</type> - <outputDirectory>${project.build.directory}/dependencies</outputDirectory> - </artifactItem> - </artifactItems> - </configuration> - <executions> - <execution> - <id>copy-jars</id> - <goals> - <goal>copy</goal> - </goals> - <phase>pre-integration-test</phase> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <version>${plugin.failsafe.version}</version> - <executions> - <execution> - <id>end-to-end-tests</id> - <phase>integration-test</phase> - <configuration> - <includes> - <include>**/*.*</include> - </includes> - <forkCount>1</forkCount> - <systemPropertyVariables> - <moduleDir>${project.basedir}</moduleDir> - </systemPropertyVariables> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </build> + <modules> + <module>sort-end-to-end-tests-v1.13</module> + </modules> + </profile> + <profile> + <id>v1.15</id> + <modules> + <module>sort-end-to-end-tests-v1.15</module> + </modules> </profile> </profiles> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/pom.xml new file mode 100644 index 0000000000..242159f4a1 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/pom.xml @@ -0,0 +1,220 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-end-to-end-tests</artifactId> + <version>1.9.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-end-to-end-tests-v1.13</artifactId> + <name>Apache InLong - Sort End to End Tests v1.13</name> + + <properties> + <inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir> + <flink.version>1.13.5</flink.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>jdbc</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mysql</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <version>${testcontainers.version}</version> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>clickhouse</artifactId> + <version>${testcontainers.version}</version> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-dist</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-flink-dependencies-v1.13</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ru.yandex.clickhouse</groupId> + <artifactId>clickhouse-jdbc</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-dist</artifactId> + <version>${project.version}</version> + <destFileName>sort-dist.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql.jdbc.version}</version> + <destFileName>mysql-driver.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </dependency> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-postgres-cdc</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-postgres-cdc.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-kafka</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-kafka.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-mysql-cdc</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-mysql-cdc.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-jdbc</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-jdbc.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + <executions> + <execution> + <id>copy-jars</id> + <goals> + <goal>copy</goal> + </goals> + <phase>pre-integration-test</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <id>end-to-end-tests-v1.13</id> + <phase>integration-test</phase> + <configuration> + <includes> + <include>**/*.*</include> + </includes> + <forkCount>1</forkCount> + <systemPropertyVariables> + <moduleDir>${project.basedir}</moduleDir> + </systemPropertyVariables> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/ClickHouseCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/ClickHouseCase.java similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/ClickHouseCase.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/ClickHouseCase.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/my.cnf b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/my.cnf similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/my.cnf copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/my.cnf diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/setup.sql similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/setup.sql diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/env/kafka_test_kafka_init.txt similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_kafka_init.txt rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/env/kafka_test_kafka_init.txt diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/env/kafka_test_mysql_init.txt similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/env/kafka_test_mysql_init.txt rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/env/kafka_test_mysql_init.txt diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/flinkSql/clickhouse_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/clickhouse_test.sql similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/flinkSql/clickhouse_test.sql rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/clickhouse_test.sql diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/flinkSql/kafka_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/kafka_test.sql similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/flinkSql/kafka_test.sql rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/kafka_test.sql diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/groupFile/kafka_test.json similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/groupFile/kafka_test.json rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/groupFile/kafka_test.json diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/log4j2-test.properties similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/log4j2-test.properties diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml new file mode 100644 index 0000000000..40d96618a1 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -0,0 +1,219 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-end-to-end-tests</artifactId> + <version>1.9.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-end-to-end-tests-v1.15</artifactId> + <name>Apache InLong - Sort End to End Tests v1.15</name> + + <properties> + <inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir> + <flink.version>1.15.4</flink.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>jdbc</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <version>${testcontainers.version}</version> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-dist</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-flink-dependencies-v1.15</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-avro</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ru.yandex.clickhouse</groupId> + <artifactId>clickhouse-jdbc</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-dist</artifactId> + <version>${project.version}</version> + <destFileName>sort-dist.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql.jdbc.version}</version> + <destFileName>mysql-driver.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </dependency> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-postgres-cdc-v1.15</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-postgres-cdc.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-starrocks-v1.15</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-starrocks.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + <executions> + <execution> + <id>copy-jars</id> + <goals> + <goal>copy</goal> + </goals> + <phase>pre-integration-test</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <id>end-to-end-tests-v1.15</id> + <phase>integration-test</phase> + <configuration> + <includes> + <include>**/*.*</include> + </includes> + <forkCount>1</forkCount> + <systemPropertyVariables> + <moduleDir>${project.basedir}</moduleDir> + </systemPropertyVariables> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java new file mode 100644 index 0000000000..499cc4b7f2 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests; + +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.JdbcProxy; +import org.apache.inlong.sort.tests.utils.StarRocksContainer; +import org.apache.inlong.sort.tests.utils.TestUtils; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +/** + * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. + * Test flink sql Postgres cdc to StarRocks + */ +public class PostgresToStarRocksTest extends FlinkContainerTestEnv { + + private static final Logger LOG = LoggerFactory.getLogger(PostgresToStarRocksTest.class); + + private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar"); + private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); + private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); + + private static final Logger STAR_ROCKS_LOG = LoggerFactory.getLogger(StarRocksContainer.class); + + private static final String sqlFile; + + // ---------------------------------------------------------------------------------------- + // StarRocks Variables + // ---------------------------------------------------------------------------------------- + private static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks"; + private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks"; + private static final String NEW_STARROCKS_TAG = "latest"; + private static final String STAR_ROCKS_IMAGE_NAME = "starrocks/allin1-ubi:3.0.4"; + + static { + try { + sqlFile = Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI()).toString(); + buildStarRocksImage(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static String getNewStarRocksImageName() { + return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG; + } + + public static void buildStarRocksImage() { + GenericContainer oldStarRocks = new GenericContainer(STAR_ROCKS_IMAGE_NAME); + Startables.deepStart(Stream.of(oldStarRocks)).join(); + oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"), + "/data/deploy/"); + try { + oldStarRocks.execInContainer("chmod", "+x", "/data/deploy/start_fe_be.sh"); + } catch (Exception e) { + e.printStackTrace(); + } + oldStarRocks.getDockerClient() + .commitCmd(oldStarRocks.getContainerId()) + .withRepository(NEW_STARROCKS_REPOSITORY) + .withTag(NEW_STARROCKS_TAG).exec(); + oldStarRocks.stop(); + } + + @ClassRule + public static StarRocksContainer STAR_ROCKS = (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) + .withExposedPorts(9030, 8030, 8040) + .withNetwork(NETWORK) + .withAccessToHost(true) + .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); + + @ClassRule + public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer( + DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres")) + .withUsername("flinkuser") + .withPassword("flinkpw") + .withDatabaseName("test") + .withNetwork(NETWORK) + .withNetworkAliases("postgres") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @Before + public void setup() { + waitUntilJobRunning(Duration.ofSeconds(30)); + initializePostgresTable(); + initializeStarRocksTable(); + } + + private void initializePostgresTable() { + try { + Class.forName(POSTGRES_CONTAINER.getDriverClassName()); + Connection conn = DriverManager + .getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + Statement stat = conn.createStatement(); + stat.execute( + "CREATE TABLE test_input1 (\n" + + " id SERIAL,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + + " description VARCHAR(512),\n" + + " PRIMARY KEY(id)\n" + + ");"); + stat.execute( + "ALTER TABLE test_input1 REPLICA IDENTITY FULL; "); + stat.close(); + conn.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void initializeStarRocksTable() { + try (Connection conn = + DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), + STAR_ROCKS.getPassword()); + Statement stat = conn.createStatement()) { + stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n" + + " id INT NOT NULL,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + + " description VARCHAR(512)\n" + + ")\n" + + "PRIMARY KEY(id)\n" + + "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\" = \"1\");"); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void teardown() { + if (POSTGRES_CONTAINER != null) { + POSTGRES_CONTAINER.stop(); + } + if (STAR_ROCKS != null) { + STAR_ROCKS.stop(); + } + } + + /** + * Test flink sql postgresql cdc to StarRocks + * + * @throws Exception The exception may throws when execute the case + */ + @Test + public void testPostgresUpdateAndDelete() throws Exception { + submitSQLJob(sqlFile, jdbcJar, postgresJar, mysqlJdbcJar); + waitUntilJobRunning(Duration.ofSeconds(10)); + + // generate input + try (Connection conn = + DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + Statement stat = conn.createStatement()) { + stat.execute( + "INSERT INTO test_input1 " + + "VALUES (1,'jacket','water resistent white wind breaker');"); + stat.execute( + "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');"); + stat.execute( + "update test_input1 set name = 'tom' where id = 2;"); + stat.execute( + "delete from test_input1 where id = 1;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + JdbcProxy proxy = + new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(), + STAR_ROCKS.getPassword(), + STAR_ROCKS.getDriverClassName()); + List<String> expectResult = + Arrays.asList("2,tom,Big 2-wheel scooter "); + proxy.checkResultWithTimeout( + expectResult, + "test_output1", + 3, + 60000L); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java similarity index 84% rename from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java index cbe4d9b64f..c328ac951e 100644 --- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java @@ -74,7 +74,6 @@ public abstract class FlinkContainerTestEnv extends TestLogger { private static final Logger JM_LOG = LoggerFactory.getLogger(JobMaster.class); private static final Logger TM_LOG = LoggerFactory.getLogger(TaskExecutor.class); - private static final Logger MYSQL_LOG = LoggerFactory.getLogger(MySqlContainer.class); private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnv.class); private static final Path SORT_DIST_JAR = TestUtils.getResource("sort-dist.jar"); @@ -107,29 +106,12 @@ public abstract class FlinkContainerTestEnv extends TestLogger { private static GenericContainer<?> jobManager; private static GenericContainer<?> taskManager; - // ---------------------------------------------------------------------------------------- - // MYSQL Variables - // ---------------------------------------------------------------------------------------- - protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - private static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; - @ClassRule - public static final MySqlContainer MYSQL = - (MySqlContainer) new MySqlContainer() - .withConfigurationOverride("docker/mysql/my.cnf") - .withSetupSQL("docker/mysql/setup.sql") - .withDatabaseName("test") - .withUsername("flinkuser") - .withPassword("flinkpw") - .withUrlParam("allowMultiQueries", "true") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) - .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG)); @BeforeClass public static void before() { LOG.info("Starting containers..."); jobManager = - new GenericContainer<>("flink:1.13.5-scala_2.11") + new GenericContainer<>("flink:1.15.4-scala_2.12") .withCommand("jobmanager") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) @@ -137,7 +119,7 @@ public abstract class FlinkContainerTestEnv extends TestLogger { .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) .withLogConsumer(new Slf4jLogConsumer(JM_LOG)); taskManager = - new GenericContainer<>("flink:1.13.5-scala_2.11") + new GenericContainer<>("flink:1.15.4-scala_2.12") .withCommand("taskmanager") .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) @@ -162,9 +144,6 @@ public abstract class FlinkContainerTestEnv extends TestLogger { if (taskManager != null) { taskManager.stop(); } - if (MYSQL != null) { - MYSQL.stop(); - } } /** @@ -191,25 +170,6 @@ public abstract class FlinkContainerTestEnv extends TestLogger { } } - public void submitGroupFileJob(String groupFile, Path... jars) - throws IOException, InterruptedException { - final List<String> commands = new ArrayList<>(); - String containerGroupFile = copyToContainerTmpPath(jobManager, groupFile); - commands.add(FLINK_BIN + "/flink run -d"); - commands.add("-c org.apache.inlong.sort.Entrance"); - commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars))); - commands.add("--group.info.file"); - commands.add(containerGroupFile); - - ExecResult execResult = - jobManager.execInContainer("bash", "-c", String.join(" ", commands)); - LOG.info(execResult.getStdout()); - LOG.error(execResult.getStderr()); - if (execResult.getExitCode() != 0) { - throw new AssertionError("Failed when submitting the SQL job."); - } - } - /** * Get {@link RestClusterClient} connected to this FlinkContainer. * diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/JdbcProxy.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java new file mode 100644 index 0000000000..47f0d673c8 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.apache.commons.lang3.StringUtils; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static java.util.stream.Collectors.joining; + +/** + * Docker container for StarRocks. + */ +@SuppressWarnings("rawtypes") +public class StarRocksContainer extends GenericContainer { + + public static final String IMAGE = "starrocks/allin1-ubi"; + public static final Integer STAR_ROCKS_QUERY_PORT = 9030; + public static final Integer STAR_ROCKS_FD_HTTP_PORT = 8030; + public static final Integer STAR_ROCKS_ED_HTTP_PORT = 8040; + + private Map<String, String> urlParameters = new HashMap<>(); + + private String databaseName = "test"; + private String username = "inlong"; + private String password = "inlong"; + + public StarRocksContainer() { + this(StarRocksVersion.V3_0); + } + + public StarRocksContainer(StarRocksVersion version) { + super(DockerImageName.parse(IMAGE + ":" + version.getVersion()).asCompatibleSubstituteFor("starrocks")); + addExposedPort(STAR_ROCKS_QUERY_PORT); + addExposedPort(STAR_ROCKS_FD_HTTP_PORT); + addExposedPort(STAR_ROCKS_ED_HTTP_PORT); + } + + public StarRocksContainer(String imageName) { + super(DockerImageName.parse(imageName).asCompatibleSubstituteFor("starrocks")); + addExposedPort(STAR_ROCKS_QUERY_PORT); + addExposedPort(STAR_ROCKS_FD_HTTP_PORT); + addExposedPort(STAR_ROCKS_ED_HTTP_PORT); + } + + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + private String constructUrlParameters(String startCharacter, String delimiter, String endCharacter) { + String urlParameters = ""; + if (!this.urlParameters.isEmpty()) { + String additionalParameters = this.urlParameters.entrySet().stream() + .map(Object::toString) + .collect(joining(delimiter)); + urlParameters = startCharacter + additionalParameters + endCharacter; + } + return urlParameters; + } + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&", StringUtils.EMPTY); + return "jdbc:mysql://" + + getHost() + + ":" + + getDatabasePort() + + "/" + + databaseName + + additionalUrlParams; + } + + public String getJdbcUrl() { + return getJdbcUrl(databaseName); + } + + public int getDatabasePort() { + return getMappedPort(STAR_ROCKS_QUERY_PORT); + } + + + public String getDatabaseName() { + return databaseName; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + protected String getTestQueryString() { + return "SELECT 1"; + } + + /** MySql version enum. */ + public enum StarRocksVersion { + + V2_5("2.5.8"), + V3_0("3.0.4"), + V3_1("3.1.0-rc01"); + + private String version; + + StarRocksVersion(String version) { + this.version = version; + } + + public String getVersion() { + return version; + } + + @Override + public String toString() { + return "StarRocksVersion{" + "version='" + version + '\'' + '}'; + } + } +} diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/TestUtils.java diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/my.cnf b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf similarity index 100% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/my.cnf rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql similarity index 100% copy from inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql copy to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/postgresql/setup.sql similarity index 79% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/postgresql/setup.sql index 9ec4b48bbd..cf96436034 100644 --- a/inlong-sort/sort-end-to-end-tests/src/test/resources/docker/mysql/setup.sql +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/postgresql/setup.sql @@ -20,6 +20,9 @@ -- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing) -- 2) 'inlong' - all privileges -- -GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%'; -CREATE USER 'inlong' IDENTIFIED BY 'inlong'; -GRANT ALL PRIVILEGES ON *.* TO 'inlong'@'%'; +CREATE USER inlong PASSWORD 'inlong'; +ALTER ROLE inlong REPLICATION LOGIN; +GRANT CONNECT ON DATABASE test TO inlong; +GRANT SELECT,INSERT,UPDATE,DELETE ON ALL TABLES IN SCHEMA PUBLIC TO inlong; +GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC TO inlong; +ALTER DEFAULT PRIVILEGES IN SCHEMA PUBLIC GRANT SELECT ON TABLES TO inlong; \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh new file mode 100644 index 0000000000..fc37959f77 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#!/bin/bash + +log_stdin() +{ + echo "[`date`] $@" >&1 +} +log_stdin `cat /etc/hosts` + +cp /etc/hosts /etc/hosts.temp +sed -i 'N;$!P;D' /etc/hosts.temp +cat /etc/hosts.temp > /etc/hosts + +log_stdin `cat /etc/hosts` +sleep 2 +# Start FE. +cd $SR_HOME/fe/bin/ + +log_stdin "Start FE" +./start_fe.sh --host_type FQDN --daemon + +# Start BE. +log_stdin "Start BE" +cd $SR_HOME/be/bin/ +./start_be.sh --daemon + +# Sleep until the cluster starts. +sleep 20; + +# Fetch fqdn with the command suggested by AWS official doc: https://docs.aws.amazon.com/managedservices/latest/userguide/find-FQDN.html +MYFQDN=`hostname --fqdn` +log_stdin "Register BE ${MYFQDN} to FE" +mysql -uroot -h${MYFQDN} -P 9030 -e "alter system add backend '${MYFQDN}:9050';" + +# Sleep until the BE register to FE. +sleep 5; + +log_stdin "init starrocks db begin..." +mysql -uroot -h${MYFQDN} -P 9030 <<EOF +CREATE DATABASE IF NOT EXISTS test; +USE test; +CREATE ROLE inlong; +GRANT CREATE TABLE, ALTER, DROP ON DATABASE test TO ROLE inlong WITH GRANT OPTION; +GRANT SELECT, ALTER, INSERT, UPDATE, DELETE ON ALL TABLES IN DATABASE test TO ROLE 'inlong' WITH GRANT OPTION; +CREATE USER 'inlong'@'%' IDENTIFIED BY 'inlong' DEFAULT ROLE 'inlong'; +CREATE USER 'flinkuser'@'%' IDENTIFIED BY 'flinkpw' DEFAULT ROLE 'inlong'; +EOF +log_stdin "init starrocks db end!" + +# health check the entire stack end-to-end and exit on failure. +while sleep 10; do + PROCESS_STATUS=`mysql -uroot -h127.0.0.1 -P 9030 -e "show backends\G" |grep "Alive: true"` + if [ -z "$PROCESS_STATUS" ]; then + log_stdin "service has exited" + exit 1; + fi; + log_stdin $PROCESS_STATUS +done diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql new file mode 100644 index 0000000000..64bfe3f606 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql @@ -0,0 +1,39 @@ +CREATE TABLE test_input1 ( + `id` INT primary key, + name STRING, + description STRING +) WITH ( + 'connector' = 'postgres-cdc-inlong', + 'hostname' = 'postgres', + 'port' = '5432', + 'username' = 'flinkuser', + 'password' = 'flinkpw', + 'database-name' = 'test', + 'table-name' = 'test_input1', + 'schema-name' = 'public', + 'decoding.plugin.name' = 'pgoutput', + 'slot.name' = 'inlong_slot', + 'debezium.slot.name' = 'inlong_slot' +); + +CREATE TABLE test_output1 ( + `id` INT primary key, + name STRING, + description STRING +) WITH ( + 'connector' = 'starrocks-inlong', + 'jdbc-url' = 'jdbc:mysql://starrocks:9030', + 'load-url'='starrocks:8030', + 'database-name'='test', + 'table-name' = 'test_output1', + 'username' = 'inlong', + 'password' = 'inlong', + 'sink.buffer-flush.interval-ms' = '5000', + 'sink.properties.column_separator' = '\x01', + 'sink.properties.row_delimiter' = '\x02' +); + +INSERT INTO test_output1 select * from test_input1; + + + diff --git a/inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties similarity index 71% rename from inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties index 9f14c92cb8..cc59d85482 100644 --- a/inlong-sort/sort-end-to-end-tests/src/test/resources/log4j2-test.properties +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties @@ -41,12 +41,17 @@ appender.kafka.fileName = target/logs/kafka.log appender.kafka.layout.type = PatternLayout appender.kafka.layout.pattern = - %m%n -appender.mysql.type = File -appender.mysql.name = mysql -appender.mysql.fileName = target/logs/mysql.log -appender.mysql.layout.type = PatternLayout -appender.mysql.layout.pattern = - %m%n +appender.starrocks.type = File +appender.starrocks.name = starrocks +appender.starrocks.fileName = target/logs/starrocks.log +appender.starrocks.layout.type = PatternLayout +appender.starrocks.layout.pattern = - %m%n +appender.postgres.type = File +appender.postgres.name = postgres +appender.postgres.fileName = target/logs/postgres.log +appender.postgres.layout.type = PatternLayout +appender.postgres.layout.pattern = - %m%n logger.jm=INFO, jobmanager logger.jm.name=org.apache.flink.runtime.jobmaster.JobMaster @@ -56,11 +61,12 @@ logger.tm=INFO, taskmanager logger.tm.name=org.apache.flink.runtime.taskexecutor.TaskExecutor logger.tm.additivity=false -logger.mysql=INFO, mysql -logger.mysql.name=org.apache.inlong.sort.tests.utils.MySqlContainer -logger.mysql.additivity=false +logger.starrocks=INFO, starrocks +logger.starrocks.name=org.apache.inlong.sort.tests.StarRocksContainer +logger.starrocks.additivity=false + +logger.postgres=INFO, postgres +logger.postgres.name=org.testcontainers.containers.PostgreSQLContainer +logger.postgres.additivity=false -logger.kafka=INFO, kafkaserver -logger.kafka.name=org.apache.inlong.sort.tests.KafkaE2ECase -logger.kafka.additivity=false