This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8903184b2a [INLONG-9866][Sort] Add end to end test case(redis to redis) for sort-connector-redis-v1.15. (#9869) 8903184b2a is described below commit 8903184b2aba0792cd1f98051a179da025bda347 Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Tue Apr 2 11:29:20 2024 +0800 [INLONG-9866][Sort] Add end to end test case(redis to redis) for sort-connector-redis-v1.15. (#9869) --- .github/workflows/ci_ut.yml | 2 +- inlong-sort/sort-end-to-end-tests/pom.xml | 3 + .../sort-end-to-end-tests-v1.15/pom.xml | 21 ++- ...afkaE2EITCase.java => Kafka2StarRocksTest.java} | 11 +- ...RocksITCase.java => Mongodb2StarRocksTest.java} | 8 +- ...ToRocksITCase.java => Mysql2StarRocksTest.java} | 11 +- ...ocksITCase.java => Postgres2StarRocksTest.java} | 35 ++-- .../apache/inlong/sort/tests/RedisToRedisTest.java | 141 ++++++++++++++ ...cksITCase.java => Sqlserver2StarRocksTest.java} | 9 +- .../sort/tests/utils/FlinkContainerTestEnv.java | 3 - .../inlong/sort/tests/utils/RedisContainer.java | 37 ++++ .../inlong/sort/tests/utils/StarRocksManager.java | 3 +- .../test/resources/docker/starrocks/start_fe_be.sh | 2 +- .../src/test/resources/flinkSql/redis_test.sql | 208 +++++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 12 +- .../sort-flink-v1.15/sort-connectors/redis/pom.xml | 63 +++++-- 16 files changed, 495 insertions(+), 74 deletions(-) diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index 2ed7e4adc9..a7b75dfdce 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -101,7 +101,7 @@ jobs: CI: false - name: Unit test with Maven - run: mvn --batch-mode --update-snapshots -e -V test + run: mvn --batch-mode --update-snapshots -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13 env: CI: false diff --git a/inlong-sort/sort-end-to-end-tests/pom.xml b/inlong-sort/sort-end-to-end-tests/pom.xml index 33057433cf..740c300947 100644 --- a/inlong-sort/sort-end-to-end-tests/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/pom.xml @@ -45,6 +45,9 @@ </profile> <profile> <id>v1.15</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> <modules> <module>sort-end-to-end-tests-v1.15</module> </modules> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index 7e1904e579..f051354aab 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -152,6 +152,11 @@ <artifactId>clickhouse-jdbc</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -233,6 +238,14 @@ <type>jar</type> <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> + <artifactItem> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-redis-v1.15</artifactId> + <version>${project.version}</version> + <destFileName>sort-connector-redis.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> </artifactItems> </configuration> <executions> @@ -241,7 +254,7 @@ <goals> <goal>copy</goal> </goals> - <phase>pre-integration-test</phase> + <phase>validate</phase> </execution> </executions> </plugin> @@ -273,6 +286,12 @@ <skip>true</skip> </configuration> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${plugin.surefire.version}</version> + </plugin> </plugins> </build> diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java similarity index 94% rename from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java index 1399fe2f6f..9698711825 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java @@ -56,16 +56,15 @@ import java.util.Objects; import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage; import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName; import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; /** * End-to-end tests for sort-connector-kafka uber jar. */ -public class KafkaE2EITCase extends FlinkContainerTestEnv { +public class Kafka2StarRocksTest extends FlinkContainerTestEnv { - private static final Logger LOG = LoggerFactory.getLogger(KafkaE2EITCase.class); + private static final Logger LOG = LoggerFactory.getLogger(Kafka2StarRocksTest.class); public static final Logger MYSQL_LOG = LoggerFactory.getLogger(MySqlContainer.class); @@ -81,9 +80,8 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv { static { try { URI kafkaSqlFile = - Objects.requireNonNull(KafkaE2EITCase.class.getResource("/flinkSql/kafka_test.sql")).toURI(); + Objects.requireNonNull(Kafka2StarRocksTest.class.getResource("/flinkSql/kafka_test.sql")).toURI(); sqlFile = Paths.get(kafkaSqlFile).toString(); - buildStarRocksImage(); } catch (URISyntaxException e) { throw new RuntimeException(e); } @@ -102,7 +100,6 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv { (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) .withExposedPorts(9030, 8030, 8040) .withNetwork(NETWORK) - .withAccessToHost(true) .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); @@ -178,7 +175,7 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv { } private String getCreateStatement(String fileName, Map<String, Object> properties) { - URL url = Objects.requireNonNull(KafkaE2EITCase.class.getResource("/env/" + fileName)); + URL url = Objects.requireNonNull(Kafka2StarRocksTest.class.getResource("/env/" + fileName)); try { Path file = Paths.get(url.toURI()); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MongodbToStarRocksITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java similarity index 96% rename from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MongodbToStarRocksITCase.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java index f07c5145fb..ef3a1dbe29 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MongodbToStarRocksITCase.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java @@ -60,9 +60,9 @@ import static com.mongodb.client.model.Updates.*; * End-to-end tests for sort-connector-mongodb-cdc-v1.15 uber jar. * Test flink sql Mongodb cdc to StarRocks */ -public class MongodbToStarRocksITCase extends FlinkContainerTestEnv { +public class Mongodb2StarRocksTest extends FlinkContainerTestEnv { - private static final Logger LOG = LoggerFactory.getLogger(MongodbToStarRocksITCase.class); + private static final Logger LOG = LoggerFactory.getLogger(Mongodb2StarRocksTest.class); private static final Path mongodbJar = TestUtils.getResource("sort-connector-mongodb-cdc.jar"); private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); @@ -82,9 +82,8 @@ public class MongodbToStarRocksITCase extends FlinkContainerTestEnv { static { try { - sqlFile = Paths.get(PostgresToStarRocksITCase.class.getResource("/flinkSql/mongodb_test.sql").toURI()) + sqlFile = Paths.get(Postgres2StarRocksTest.class.getResource("/flinkSql/mongodb_test.sql").toURI()) .toString(); - buildStarRocksImage(); } catch (URISyntaxException e) { throw new RuntimeException(e); } @@ -116,7 +115,6 @@ public class MongodbToStarRocksITCase extends FlinkContainerTestEnv { (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) .withExposedPorts(9030, 8030, 8040) .withNetwork(NETWORK) - .withAccessToHost(true) .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java similarity index 93% rename from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java index 51501772a9..06b7034fae 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java @@ -44,7 +44,6 @@ import java.util.List; import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage; import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName; import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; @@ -52,9 +51,9 @@ import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStar * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. * Test flink sql Mysql cdc to StarRocks */ -public class MysqlToRocksITCase extends FlinkContainerTestEnv { +public class Mysql2StarRocksTest extends FlinkContainerTestEnv { - private static final Logger LOG = LoggerFactory.getLogger(MysqlToRocksITCase.class); + private static final Logger LOG = LoggerFactory.getLogger(Mysql2StarRocksTest.class); private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar"); private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); @@ -64,19 +63,17 @@ public class MysqlToRocksITCase extends FlinkContainerTestEnv { static { try { sqlFile = - Paths.get(MysqlToRocksITCase.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString(); - buildStarRocksImage(); + Paths.get(Mysql2StarRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString(); } catch (URISyntaxException e) { throw new RuntimeException(e); } } @ClassRule - public static StarRocksContainer STAR_ROCKS = + public static final StarRocksContainer STAR_ROCKS = (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) .withExposedPorts(9030, 8030, 8040) .withNetwork(NETWORK) - .withAccessToHost(true) .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java similarity index 88% rename from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksITCase.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java index 1acb0c9979..3f1d4f4565 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksITCase.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java @@ -43,19 +43,16 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; +import static org.apache.inlong.sort.tests.utils.StarRocksManager.*; + /** * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. * Test flink sql Postgres cdc to StarRocks */ -public class PostgresToStarRocksITCase extends FlinkContainerTestEnv { - - private static final Logger LOG = LoggerFactory.getLogger(PostgresToStarRocksITCase.class); +public class Postgres2StarRocksTest extends FlinkContainerTestEnv { + private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class); + private static final Logger LOG = LoggerFactory.getLogger(Postgres2StarRocksTest.class); private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar"); private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); @@ -63,23 +60,12 @@ public class PostgresToStarRocksITCase extends FlinkContainerTestEnv { static { try { - sqlFile = Paths.get(PostgresToStarRocksITCase.class.getResource("/flinkSql/postgres_test.sql").toURI()) + sqlFile = Paths.get(Postgres2StarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI()) .toString(); - buildStarRocksImage(); } catch (URISyntaxException e) { throw new RuntimeException(e); } } - - @ClassRule - public static StarRocksContainer STAR_ROCKS = - (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) - .withExposedPorts(9030, 8030, 8040) - .withNetwork(NETWORK) - .withAccessToHost(true) - .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) - .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); - @ClassRule public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer( DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres")) @@ -88,7 +74,14 @@ public class PostgresToStarRocksITCase extends FlinkContainerTestEnv { .withDatabaseName("test") .withNetwork(NETWORK) .withNetworkAliases("postgres") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .withLogConsumer(new Slf4jLogConsumer(PG_LOG)); + @ClassRule + public static final StarRocksContainer STAR_ROCKS = + (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) + .withExposedPorts(9030, 8030, 8040) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); @Before public void setup() { diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java new file mode 100644 index 0000000000..2332f2dcb8 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests; + +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.RedisContainer; +import org.apache.inlong.sort.tests.utils.TestUtils; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; +import redis.clients.jedis.Jedis; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +public class RedisToRedisTest extends FlinkContainerTestEnv { + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + private static final Path redisJar = TestUtils.getResource("sort-connector-redis.jar"); + private static final String sqlFile; + private static Jedis jedisSource; + private static Jedis jedisSink; + + static { + try { + sqlFile = Paths.get(RedisToRedisTest.class.getResource("/flinkSql/redis_test.sql").toURI()) + .toString(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @ClassRule + public static final RedisContainer REDIS_CONTAINER_SOURCE = new RedisContainer( + DockerImageName.parse("redis:6.2.14")) + .withExposedPorts(6379) + .withNetwork(NETWORK) + .withNetworkAliases("redis_source") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + @ClassRule + public static final RedisContainer REDIS_CONTAINER_SINK = new RedisContainer( + DockerImageName.parse("redis:6.2.14")) + .withExposedPorts(6379) + .withNetwork(NETWORK) + .withNetworkAliases("redis_sink") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + @Before + public void setup() { + waitUntilJobRunning(Duration.ofSeconds(30)); + initializeRedisTable(); + } + + private void initializeRedisTable() { + + int sourcePort = REDIS_CONTAINER_SOURCE.getRedisPort(); + int sinkPort = REDIS_CONTAINER_SINK.getRedisPort(); + + jedisSource = new Jedis("127.0.0.1", sourcePort); + jedisSink = new Jedis("127.0.0.1", sinkPort); + + jedisSource.set("1", "value_1"); + jedisSource.set("2", "value_2"); + + jedisSource.hset("3", "1", "value_1"); + jedisSource.hset("3", "2", "value_2"); + + // ZREVRANK TEST + jedisSource.zadd("rank", 10, "1"); + jedisSource.zadd("rank", 20, "2"); + jedisSource.zadd("rank", 30, "3"); + + // ZSCORETEST TEST + jedisSource.zadd("rank_score", 10, "1"); + jedisSource.zadd("rank_score", 20, "2"); + jedisSource.zadd("rank_score", 30, "3"); + + } + + @AfterClass + public static void teardown() { + REDIS_CONTAINER_SOURCE.stop(); + REDIS_CONTAINER_SINK.stop(); + } + + /** + * Test flink sql postgresql cdc to StarRocks + * + * @throws Exception The exception may throws when execute the case + */ + @Test + public void testRedisSourceAndSink() throws Exception { + submitSQLJob(sqlFile, redisJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("value_1_1", jedisSink.get("1_1")); + assertEquals("value_2_2", jedisSink.get("2_2")); + }); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("value_1", jedisSink.hget("3_3", "1")); + assertEquals("value_2", jedisSink.hget("3_3", "2")); + }); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("10.0", jedisSink.hget("rank_score_test", "1")); + assertEquals("20.0", jedisSink.hget("rank_score_test", "2")); + assertEquals("30.0", jedisSink.hget("rank_score_test", "3")); + }); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("2", jedisSink.hget("rank_test", "1")); + assertEquals("1", jedisSink.hget("rank_test", "2")); + assertEquals("0", jedisSink.hget("rank_test", "3")); + }); + } + +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java similarity index 95% rename from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksITCase.java rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java index b44f17d018..2204c8b9e0 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksITCase.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java @@ -45,13 +45,12 @@ import java.util.List; import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS; import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG; -import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage; import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName; import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable; -public class SqlserverToStarRocksITCase extends FlinkContainerTestEnv { +public class Sqlserver2StarRocksTest extends FlinkContainerTestEnv { - private static final Logger LOG = LoggerFactory.getLogger(SqlserverToStarRocksITCase.class); + private static final Logger LOG = LoggerFactory.getLogger(Sqlserver2StarRocksTest.class); private static final Path sqlserverJar = TestUtils.getResource("sort-connector-sqlserver-cdc.jar"); private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar"); @@ -62,9 +61,8 @@ public class SqlserverToStarRocksITCase extends FlinkContainerTestEnv { static { try { - sqlFile = Paths.get(SqlserverToStarRocksITCase.class.getResource("/flinkSql/sqlserver_test.sql").toURI()) + sqlFile = Paths.get(Sqlserver2StarRocksTest.class.getResource("/flinkSql/sqlserver_test.sql").toURI()) .toString(); - buildStarRocksImage(); } catch (URISyntaxException e) { throw new RuntimeException(e); } @@ -75,7 +73,6 @@ public class SqlserverToStarRocksITCase extends FlinkContainerTestEnv { (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName()) .withExposedPorts(9030, 8030, 8040) .withNetwork(NETWORK) - .withAccessToHost(true) .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java index 2426c57ae4..35688e4f8f 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java @@ -177,9 +177,6 @@ public abstract class FlinkContainerTestEnv extends TestLogger { * <p>This method lazily initializes the REST client on-demand. */ public RestClusterClient<StandaloneClusterId> getRestClusterClient() { - if (restClusterClient != null) { - return restClusterClient; - } checkState( jobManager.isRunning(), "Cluster client should only be retrieved for a running cluster"); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/RedisContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/RedisContainer.java new file mode 100644 index 0000000000..6b7f462591 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/RedisContainer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class RedisContainer extends GenericContainer<RedisContainer> { + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("redis"); + + public static final Integer REDIS_PORT = 6379; + + public RedisContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + } + public int getRedisPort() { + return getMappedPort(REDIS_PORT); + } + +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java index 3c60a4b3eb..4ccaacb523 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java @@ -39,7 +39,8 @@ public class StarRocksManager { private static final String NEW_STARROCKS_TAG = "latest"; private static final String STAR_ROCKS_IMAGE_NAME = "starrocks/allin1-ubi:3.0.4"; public static final Logger STAR_ROCKS_LOG = LoggerFactory.getLogger(StarRocksContainer.class); - public static void buildStarRocksImage() { + + static { GenericContainer oldStarRocks = new GenericContainer(STAR_ROCKS_IMAGE_NAME); Startables.deepStart(Stream.of(oldStarRocks)).join(); oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"), diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh index fc37959f77..2a8743f9fc 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh @@ -63,7 +63,7 @@ log_stdin "init starrocks db end!" # health check the entire stack end-to-end and exit on failure. while sleep 10; do - PROCESS_STATUS=`mysql -uroot -h127.0.0.1 -P 9030 -e "show backends\G" |grep "Alive: true"` + PROCESS_STATUS=`mysql -uroot -h${MYFQDN} -P 9030 -e "show backends\G" |grep "Alive: true"` if [ -z "$PROCESS_STATUS" ]; then log_stdin "service has exited" exit 1; diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/redis_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/redis_test.sql new file mode 100644 index 0000000000..a0e56230ed --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/redis_test.sql @@ -0,0 +1,208 @@ +CREATE TABLE dim_get +( + aaa varchar, + bbb varchar +) WITH ( + 'connector' = 'redis-inlong', + 'command' = 'get', + 'host' = 'redis_source', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000' + ); + +create table source_get +( + aaa varchar, + proctime as procTime() +) with ('connector' = 'datagen', 'rows-per-second' = '1', + 'fields.aaa.kind' = 'sequence', 'fields.aaa.start' = '1', 'fields.aaa.end' = '2' + ); +CREATE TABLE sink_get +( + aaa varchar, + bbb varchar, + PRIMARY KEY (`aaa`) NOT ENFORCED +) WITH ( + 'connector' = 'redis-inlong', + 'sink.batch-size' = '1', + 'format' = 'csv', + 'data-type' = 'PLAIN', + 'redis-mode' = 'standalone', + 'host' = 'redis_sink', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000' + ); + + +insert into sink_get + +select concat_ws('_', s.aaa, s.aaa), concat_ws('_', d.bbb, s.aaa) +from source_get s + left join dim_get for system_time as of s.proctime as d + on d.aaa = s.aaa; + + + +CREATE TABLE dim_hget +( + + aaa varchar, + bbb varchar +) WITH ( + 'connector' = 'redis-inlong', + 'command' = 'hget', + 'host' = 'redis_source', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000', + 'additional.key' = '3' + ); + +create table source_hget +( + aaa varchar, + bbb varchar, + proctime as procTime() +) with ('connector' = 'datagen', 'rows-per-second' = '1', + 'fields.aaa.kind' = 'sequence', 'fields.aaa.start' = '1', 'fields.aaa.end' = '2' + ); +CREATE TABLE sink_hget +( + + aaa varchar, + bbb varchar, + ccc varchar, + PRIMARY KEY (`aaa`) NOT ENFORCED +) WITH ( + 'connector' = 'redis-inlong', + 'sink.batch-size' = '1', + 'format' = 'csv', + 'data-type' = 'HASH', + 'redis-mode' = 'standalone', + 'host' = 'redis_sink', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000' + ); + +insert into sink_hget +select '3_3', d.aaa, d.bbb +from source_hget s + left join dim_hget for system_time as of s.proctime as d + on d.aaa = s.aaa and d.bbb = s.bbb; + +CREATE TABLE dim_zrevrank +( + member_test varchar, + member_rank BIGINT +) WITH ( + 'connector' = 'redis-inlong', + 'command' = 'zrevrank', + 'host' = 'redis_source', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000', + 'additional.key' = 'rank' + ); + +create table source_zrevrank +( + member_test varchar, + proctime as procTime() +) with ('connector' = 'datagen', 'rows-per-second' = '1', + 'fields.member_test.kind' = 'sequence', 'fields.member_test.start' = '1', 'fields.member_test.end' = '3' + ); +CREATE TABLE sink_zrevrank +( + + aaa varchar, + bbb varchar, + ccc BIGINT, + PRIMARY KEY(`aaa`) NOT ENFORCED +) WITH ( + 'connector' = 'redis-inlong', + 'sink.batch-size' = '1', + 'format' = 'csv', + 'data-type' = 'HASH', + 'redis-mode' = 'standalone', + 'host' = 'redis_sink', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000' + ); + +insert into sink_zrevrank +select 'rank_test', s.member_test, d.member_rank +from source_zrevrank s + left join dim_zrevrank for system_time as of s.proctime as d + on d.member_test = s.member_test; + + + +CREATE TABLE dim_zscore +( + + member_test varchar, + score double +) WITH ( + 'connector' = 'redis-inlong', + 'command' = 'zscore', + 'host' = 'redis_source', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000', + 'additional.key' = 'rank_score' + ); + +create table source_zscore +( + member_test varchar, + proctime as procTime() +) with ('connector' = 'datagen', 'rows-per-second' = '1', + 'fields.member_test.kind' = 'sequence', 'fields.member_test.start' = '1', 'fields.member_test.end' = '3' + ); +CREATE TABLE sink_zscore +( + + aaa + varchar, + bbb + varchar, + ccc + double, + PRIMARY KEY(`aaa`) NOT ENFORCED +) WITH ( + 'connector' = 'redis-inlong', + 'sink.batch-size' = '1', + 'format' = 'csv', + 'data-type' = 'HASH', + 'redis-mode' = 'standalone', + 'host' = 'redis_sink', + 'port' = '6379', + 'maxIdle' = '8', + 'minIdle' = '1', + 'maxTotal' = '2', + 'timeout' = '2000' + ); + +insert into sink_zscore +select 'rank_score_test', d.member_test, d.score +from source_zscore s + left join dim_zscore for system_time as of s.proctime as d + on d.member_test = s.member_test; \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties index cc59d85482..8b0c655831 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties @@ -53,6 +53,12 @@ appender.postgres.fileName = target/logs/postgres.log appender.postgres.layout.type = PatternLayout appender.postgres.layout.pattern = - %m%n +appender.redis.type = File +appender.redis.name = redis +appender.redis.fileName = target/logs/redis.log +appender.redis.layout.type = PatternLayout +appender.redis.layout.pattern = - %m%n + logger.jm=INFO, jobmanager logger.jm.name=org.apache.flink.runtime.jobmaster.JobMaster logger.jm.additivity=false @@ -62,11 +68,15 @@ logger.tm.name=org.apache.flink.runtime.taskexecutor.TaskExecutor logger.tm.additivity=false logger.starrocks=INFO, starrocks -logger.starrocks.name=org.apache.inlong.sort.tests.StarRocksContainer +logger.starrocks.name=org.apache.inlong.sort.tests.utils.StarRocksContainer logger.starrocks.additivity=false logger.postgres=INFO, postgres logger.postgres.name=org.testcontainers.containers.PostgreSQLContainer logger.postgres.additivity=false +logger.redis=INFO, redis +logger.redis.name=org.apache.inlong.sort.tests.utils.RedisContainer +logger.redis.additivity=false + diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml index c37b5b17a6..651cb8e910 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml @@ -27,31 +27,24 @@ </parent> <artifactId>sort-connector-redis-v1.15</artifactId> + <packaging>jar</packaging> <name>Apache InLong - Sort-connector-redis</name> <properties> <inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir> </properties> <dependencies> - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-jackson --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-jackson</artifactId> <version>${flink.shaded.jackson}</version> + <scope>test</scope> </dependency> - - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${flink.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.bahir</groupId> @@ -99,7 +92,6 @@ <version>${embedded.redis.version}</version> <scope>test</scope> </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> @@ -114,16 +106,9 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> <version>${flink.version}</version> - <type>test-jar</type> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -131,7 +116,6 @@ <version>${flink.version}</version> <scope>test</scope> </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> @@ -149,6 +133,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> + <version>${plugin.shade.version}</version> <executions> <execution> <id>shade-flink</id> @@ -157,6 +142,22 @@ </goals> <phase>package</phase> <configuration> + <artifactSet> + <includes> + <include>org.apache.inlong:*</include> + <include>io.streamnative.connectors:flink-protobuf</include> + <include>org.apache.flink:flink-connector-base</include> + <include>org.apache.flink:flink-shaded-guava</include> + <include>org.apache.bahir:flink-connector-redis_${flink.scala.binary.version}</include> + <include>org.apache.commons:commons-lang3</include> + <include>org.apache.commons:commons-pool2</include> + <include>com.google.protobuf:*</include> + <include>javax.*:*</include> + <include>org.lz4*:*</include> + <include>org.slf4j:jul-to-slf4j</include> + <include>redis.clients:jedis</include> + </includes> + </artifactSet> <filters> <filter> <artifact>org.apache.inlong:sort-connector-*</artifact> @@ -167,6 +168,28 @@ </includes> </filter> </filters> + <relocations> + <relocation> + <pattern>org.apache.commons.pool2</pattern> + <shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.commons.pool2</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.commons.logging</pattern> + <shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.commons.logging</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.commons.lang3</pattern> + <shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.commons.lang3</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.inlong.sort.base</pattern> + <shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.inlong.sort.base</shadedPattern> + </relocation> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.inlong.sort.redis.shaded.com.google</shadedPattern> + </relocation> + </relocations> </configuration> </execution> </executions>