This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8903184b2a [INLONG-9866][Sort] Add end to end test case(redis to 
redis) for sort-connector-redis-v1.15. (#9869)
8903184b2a is described below

commit 8903184b2aba0792cd1f98051a179da025bda347
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Tue Apr 2 11:29:20 2024 +0800

    [INLONG-9866][Sort] Add end to end test case(redis to redis) for 
sort-connector-redis-v1.15. (#9869)
---
 .github/workflows/ci_ut.yml                        |   2 +-
 inlong-sort/sort-end-to-end-tests/pom.xml          |   3 +
 .../sort-end-to-end-tests-v1.15/pom.xml            |  21 ++-
 ...afkaE2EITCase.java => Kafka2StarRocksTest.java} |  11 +-
 ...RocksITCase.java => Mongodb2StarRocksTest.java} |   8 +-
 ...ToRocksITCase.java => Mysql2StarRocksTest.java} |  11 +-
 ...ocksITCase.java => Postgres2StarRocksTest.java} |  35 ++--
 .../apache/inlong/sort/tests/RedisToRedisTest.java | 141 ++++++++++++++
 ...cksITCase.java => Sqlserver2StarRocksTest.java} |   9 +-
 .../sort/tests/utils/FlinkContainerTestEnv.java    |   3 -
 .../inlong/sort/tests/utils/RedisContainer.java    |  37 ++++
 .../inlong/sort/tests/utils/StarRocksManager.java  |   3 +-
 .../test/resources/docker/starrocks/start_fe_be.sh |   2 +-
 .../src/test/resources/flinkSql/redis_test.sql     | 208 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  12 +-
 .../sort-flink-v1.15/sort-connectors/redis/pom.xml |  63 +++++--
 16 files changed, 495 insertions(+), 74 deletions(-)

diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml
index 2ed7e4adc9..a7b75dfdce 100644
--- a/.github/workflows/ci_ut.yml
+++ b/.github/workflows/ci_ut.yml
@@ -101,7 +101,7 @@ jobs:
           CI: false
 
       - name: Unit test with Maven
-        run: mvn --batch-mode --update-snapshots -e -V test
+        run: mvn --batch-mode --update-snapshots -e -V test -pl 
!:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13
         env:
           CI: false
 
diff --git a/inlong-sort/sort-end-to-end-tests/pom.xml 
b/inlong-sort/sort-end-to-end-tests/pom.xml
index 33057433cf..740c300947 100644
--- a/inlong-sort/sort-end-to-end-tests/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/pom.xml
@@ -45,6 +45,9 @@
         </profile>
         <profile>
             <id>v1.15</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
             <modules>
                 <module>sort-end-to-end-tests-v1.15</module>
             </modules>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 7e1904e579..f051354aab 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -152,6 +152,11 @@
             <artifactId>clickhouse-jdbc</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -233,6 +238,14 @@
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            <artifactId>sort-connector-redis-v1.15</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sort-connector-redis.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
                     </artifactItems>
                 </configuration>
                 <executions>
@@ -241,7 +254,7 @@
                         <goals>
                             <goal>copy</goal>
                         </goals>
-                        <phase>pre-integration-test</phase>
+                        <phase>validate</phase>
                     </execution>
                 </executions>
             </plugin>
@@ -273,6 +286,12 @@
                     <skip>true</skip>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${plugin.surefire.version}</version>
+            </plugin>
         </plugins>
     </build>
 
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
similarity index 94%
rename from 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
rename to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
index 1399fe2f6f..9698711825 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
@@ -56,16 +56,15 @@ import java.util.Objects;
 
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
 
 /**
  * End-to-end tests for sort-connector-kafka uber jar.
  */
-public class KafkaE2EITCase extends FlinkContainerTestEnv {
+public class Kafka2StarRocksTest extends FlinkContainerTestEnv {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaE2EITCase.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(Kafka2StarRocksTest.class);
 
     public static final Logger MYSQL_LOG = 
LoggerFactory.getLogger(MySqlContainer.class);
 
@@ -81,9 +80,8 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv {
     static {
         try {
             URI kafkaSqlFile =
-                    
Objects.requireNonNull(KafkaE2EITCase.class.getResource("/flinkSql/kafka_test.sql")).toURI();
+                    
Objects.requireNonNull(Kafka2StarRocksTest.class.getResource("/flinkSql/kafka_test.sql")).toURI();
             sqlFile = Paths.get(kafkaSqlFile).toString();
-            buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
@@ -102,7 +100,6 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv {
             (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
                     .withExposedPorts(9030, 8030, 8040)
                     .withNetwork(NETWORK)
-                    .withAccessToHost(true)
                     .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
                     .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
@@ -178,7 +175,7 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv {
     }
 
     private String getCreateStatement(String fileName, Map<String, Object> 
properties) {
-        URL url = 
Objects.requireNonNull(KafkaE2EITCase.class.getResource("/env/" + fileName));
+        URL url = 
Objects.requireNonNull(Kafka2StarRocksTest.class.getResource("/env/" + 
fileName));
 
         try {
             Path file = Paths.get(url.toURI());
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MongodbToStarRocksITCase.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java
similarity index 96%
rename from 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MongodbToStarRocksITCase.java
rename to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java
index f07c5145fb..ef3a1dbe29 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MongodbToStarRocksITCase.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mongodb2StarRocksTest.java
@@ -60,9 +60,9 @@ import static com.mongodb.client.model.Updates.*;
  * End-to-end tests for sort-connector-mongodb-cdc-v1.15 uber jar.
  * Test flink sql Mongodb cdc to StarRocks
  */
-public class MongodbToStarRocksITCase extends FlinkContainerTestEnv {
+public class Mongodb2StarRocksTest extends FlinkContainerTestEnv {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MongodbToStarRocksITCase.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(Mongodb2StarRocksTest.class);
 
     private static final Path mongodbJar = 
TestUtils.getResource("sort-connector-mongodb-cdc.jar");
     private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
@@ -82,9 +82,8 @@ public class MongodbToStarRocksITCase extends 
FlinkContainerTestEnv {
 
     static {
         try {
-            sqlFile = 
Paths.get(PostgresToStarRocksITCase.class.getResource("/flinkSql/mongodb_test.sql").toURI())
+            sqlFile = 
Paths.get(Postgres2StarRocksTest.class.getResource("/flinkSql/mongodb_test.sql").toURI())
                     .toString();
-            buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
@@ -116,7 +115,6 @@ public class MongodbToStarRocksITCase extends 
FlinkContainerTestEnv {
             (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
                     .withExposedPorts(9030, 8030, 8040)
                     .withNetwork(NETWORK)
-                    .withAccessToHost(true)
                     .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
                     .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
similarity index 93%
rename from 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
rename to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
index 51501772a9..06b7034fae 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
@@ -44,7 +44,6 @@ import java.util.List;
 
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
 
@@ -52,9 +51,9 @@ import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStar
  * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
  * Test flink sql Mysql cdc to StarRocks
  */
-public class MysqlToRocksITCase extends FlinkContainerTestEnv {
+public class Mysql2StarRocksTest extends FlinkContainerTestEnv {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MysqlToRocksITCase.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(Mysql2StarRocksTest.class);
 
     private static final Path mysqlJar = 
TestUtils.getResource("sort-connector-mysql-cdc.jar");
     private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
@@ -64,19 +63,17 @@ public class MysqlToRocksITCase extends 
FlinkContainerTestEnv {
     static {
         try {
             sqlFile =
-                    
Paths.get(MysqlToRocksITCase.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
-            buildStarRocksImage();
+                    
Paths.get(Mysql2StarRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
     }
 
     @ClassRule
-    public static StarRocksContainer STAR_ROCKS =
+    public static final StarRocksContainer STAR_ROCKS =
             (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
                     .withExposedPorts(9030, 8030, 8040)
                     .withNetwork(NETWORK)
-                    .withAccessToHost(true)
                     .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
                     .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksITCase.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java
similarity index 88%
rename from 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksITCase.java
rename to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java
index 1acb0c9979..3f1d4f4565 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksITCase.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java
@@ -43,19 +43,16 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+import static org.apache.inlong.sort.tests.utils.StarRocksManager.*;
+
 /**
  * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
  * Test flink sql Postgres cdc to StarRocks
  */
-public class PostgresToStarRocksITCase extends FlinkContainerTestEnv {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresToStarRocksITCase.class);
+public class Postgres2StarRocksTest extends FlinkContainerTestEnv {
 
+    private static final Logger PG_LOG = 
LoggerFactory.getLogger(PostgreSQLContainer.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(Postgres2StarRocksTest.class);
     private static final Path postgresJar = 
TestUtils.getResource("sort-connector-postgres-cdc.jar");
     private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
     private static final Path mysqlJdbcJar = 
TestUtils.getResource("mysql-driver.jar");
@@ -63,23 +60,12 @@ public class PostgresToStarRocksITCase extends 
FlinkContainerTestEnv {
 
     static {
         try {
-            sqlFile = 
Paths.get(PostgresToStarRocksITCase.class.getResource("/flinkSql/postgres_test.sql").toURI())
+            sqlFile = 
Paths.get(Postgres2StarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI())
                     .toString();
-            buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
     }
-
-    @ClassRule
-    public static StarRocksContainer STAR_ROCKS =
-            (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
-                    .withExposedPorts(9030, 8030, 8040)
-                    .withNetwork(NETWORK)
-                    .withAccessToHost(true)
-                    .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
-                    .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
-
     @ClassRule
     public static final PostgreSQLContainer POSTGRES_CONTAINER = 
(PostgreSQLContainer) new PostgreSQLContainer(
             
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres"))
@@ -88,7 +74,14 @@ public class PostgresToStarRocksITCase extends 
FlinkContainerTestEnv {
                     .withDatabaseName("test")
                     .withNetwork(NETWORK)
                     .withNetworkAliases("postgres")
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+                    .withLogConsumer(new Slf4jLogConsumer(PG_LOG));
+    @ClassRule
+    public static final StarRocksContainer STAR_ROCKS =
+            (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
+                    .withExposedPorts(9030, 8030, 8040)
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
+                    .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
     @Before
     public void setup() {
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java
new file mode 100644
index 0000000000..2332f2dcb8
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/RedisToRedisTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
+import org.apache.inlong.sort.tests.utils.RedisContainer;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.Jedis;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+public class RedisToRedisTest extends FlinkContainerTestEnv {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+    private static final Path redisJar = 
TestUtils.getResource("sort-connector-redis.jar");
+    private static final String sqlFile;
+    private static Jedis jedisSource;
+    private static Jedis jedisSink;
+
+    static {
+        try {
+            sqlFile = 
Paths.get(RedisToRedisTest.class.getResource("/flinkSql/redis_test.sql").toURI())
+                    .toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClassRule
+    public static final RedisContainer REDIS_CONTAINER_SOURCE = new 
RedisContainer(
+            DockerImageName.parse("redis:6.2.14"))
+                    .withExposedPorts(6379)
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("redis_source")
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    @ClassRule
+    public static final RedisContainer REDIS_CONTAINER_SINK = new 
RedisContainer(
+            DockerImageName.parse("redis:6.2.14"))
+                    .withExposedPorts(6379)
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("redis_sink")
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    @Before
+    public void setup() {
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        initializeRedisTable();
+    }
+
+    private void initializeRedisTable() {
+
+        int sourcePort = REDIS_CONTAINER_SOURCE.getRedisPort();
+        int sinkPort = REDIS_CONTAINER_SINK.getRedisPort();
+
+        jedisSource = new Jedis("127.0.0.1", sourcePort);
+        jedisSink = new Jedis("127.0.0.1", sinkPort);
+
+        jedisSource.set("1", "value_1");
+        jedisSource.set("2", "value_2");
+
+        jedisSource.hset("3", "1", "value_1");
+        jedisSource.hset("3", "2", "value_2");
+
+        // ZREVRANK TEST
+        jedisSource.zadd("rank", 10, "1");
+        jedisSource.zadd("rank", 20, "2");
+        jedisSource.zadd("rank", 30, "3");
+
+        // ZSCORETEST TEST
+        jedisSource.zadd("rank_score", 10, "1");
+        jedisSource.zadd("rank_score", 20, "2");
+        jedisSource.zadd("rank_score", 30, "3");
+
+    }
+
+    @AfterClass
+    public static void teardown() {
+        REDIS_CONTAINER_SOURCE.stop();
+        REDIS_CONTAINER_SINK.stop();
+    }
+
+    /**
+     * Test flink sql postgresql cdc to StarRocks
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testRedisSourceAndSink() throws Exception {
+        submitSQLJob(sqlFile, redisJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("value_1_1", jedisSink.get("1_1"));
+            assertEquals("value_2_2", jedisSink.get("2_2"));
+        });
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("value_1", jedisSink.hget("3_3", "1"));
+            assertEquals("value_2", jedisSink.hget("3_3", "2"));
+        });
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("10.0", jedisSink.hget("rank_score_test", "1"));
+            assertEquals("20.0", jedisSink.hget("rank_score_test", "2"));
+            assertEquals("30.0", jedisSink.hget("rank_score_test", "3"));
+        });
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("2", jedisSink.hget("rank_test", "1"));
+            assertEquals("1", jedisSink.hget("rank_test", "2"));
+            assertEquals("0", jedisSink.hget("rank_test", "3"));
+        });
+    }
+
+}
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksITCase.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java
similarity index 95%
rename from 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksITCase.java
rename to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java
index b44f17d018..2204c8b9e0 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksITCase.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Sqlserver2StarRocksTest.java
@@ -45,13 +45,12 @@ import java.util.List;
 
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
-import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
 
-public class SqlserverToStarRocksITCase extends FlinkContainerTestEnv {
+public class Sqlserver2StarRocksTest extends FlinkContainerTestEnv {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(SqlserverToStarRocksITCase.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(Sqlserver2StarRocksTest.class);
 
     private static final Path sqlserverJar = 
TestUtils.getResource("sort-connector-sqlserver-cdc.jar");
     private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
@@ -62,9 +61,8 @@ public class SqlserverToStarRocksITCase extends 
FlinkContainerTestEnv {
 
     static {
         try {
-            sqlFile = 
Paths.get(SqlserverToStarRocksITCase.class.getResource("/flinkSql/sqlserver_test.sql").toURI())
+            sqlFile = 
Paths.get(Sqlserver2StarRocksTest.class.getResource("/flinkSql/sqlserver_test.sql").toURI())
                     .toString();
-            buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
@@ -75,7 +73,6 @@ public class SqlserverToStarRocksITCase extends 
FlinkContainerTestEnv {
             (StarRocksContainer) new 
StarRocksContainer(getNewStarRocksImageName())
                     .withExposedPorts(9030, 8030, 8040)
                     .withNetwork(NETWORK)
-                    .withAccessToHost(true)
                     .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
                     .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index 2426c57ae4..35688e4f8f 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -177,9 +177,6 @@ public abstract class FlinkContainerTestEnv extends 
TestLogger {
      * <p>This method lazily initializes the REST client on-demand.
      */
     public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
-        if (restClusterClient != null) {
-            return restClusterClient;
-        }
         checkState(
                 jobManager.isRunning(),
                 "Cluster client should only be retrieved for a running 
cluster");
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/RedisContainer.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/RedisContainer.java
new file mode 100644
index 0000000000..6b7f462591
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/RedisContainer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class RedisContainer extends GenericContainer<RedisContainer> {
+
+    private static final DockerImageName DEFAULT_IMAGE_NAME = 
DockerImageName.parse("redis");
+
+    public static final Integer REDIS_PORT = 6379;
+
+    public RedisContainer(final DockerImageName dockerImageName) {
+        super(dockerImageName);
+        dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+    }
+    public int getRedisPort() {
+        return getMappedPort(REDIS_PORT);
+    }
+
+}
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
index 3c60a4b3eb..4ccaacb523 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
@@ -39,7 +39,8 @@ public class StarRocksManager {
     private static final String NEW_STARROCKS_TAG = "latest";
     private static final String STAR_ROCKS_IMAGE_NAME = 
"starrocks/allin1-ubi:3.0.4";
     public static final Logger STAR_ROCKS_LOG = 
LoggerFactory.getLogger(StarRocksContainer.class);
-    public static void buildStarRocksImage() {
+
+    static {
         GenericContainer oldStarRocks = new 
GenericContainer(STAR_ROCKS_IMAGE_NAME);
         Startables.deepStart(Stream.of(oldStarRocks)).join();
         
oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"),
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh
index fc37959f77..2a8743f9fc 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/starrocks/start_fe_be.sh
@@ -63,7 +63,7 @@ log_stdin "init starrocks db end!"
 
 # health check the entire stack end-to-end and exit on failure.
 while sleep 10; do
-  PROCESS_STATUS=`mysql -uroot -h127.0.0.1 -P 9030 -e "show backends\G" |grep 
"Alive: true"`
+  PROCESS_STATUS=`mysql -uroot -h${MYFQDN} -P 9030 -e "show backends\G" |grep 
"Alive: true"`
   if [ -z "$PROCESS_STATUS" ]; then
         log_stdin "service has exited"
         exit 1;
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/redis_test.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/redis_test.sql
new file mode 100644
index 0000000000..a0e56230ed
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/redis_test.sql
@@ -0,0 +1,208 @@
+CREATE TABLE dim_get
+(
+    aaa varchar,
+    bbb varchar
+) WITH (
+      'connector' = 'redis-inlong',
+      'command' = 'get',
+      'host' = 'redis_source',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000'
+      );
+
+create table source_get
+(
+    aaa varchar,
+    proctime as procTime()
+) with ('connector' = 'datagen', 'rows-per-second' = '1',
+      'fields.aaa.kind' = 'sequence', 'fields.aaa.start' = '1', 
'fields.aaa.end' = '2'
+      );
+CREATE TABLE sink_get
+(
+    aaa varchar,
+    bbb varchar,
+    PRIMARY KEY (`aaa`) NOT ENFORCED
+) WITH (
+      'connector' = 'redis-inlong',
+      'sink.batch-size' = '1',
+      'format' = 'csv',
+      'data-type' = 'PLAIN',
+      'redis-mode' = 'standalone',
+      'host' = 'redis_sink',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000'
+      );
+
+
+insert into sink_get
+
+select concat_ws('_', s.aaa, s.aaa), concat_ws('_', d.bbb, s.aaa)
+from source_get s
+         left join dim_get for system_time as of s.proctime as d
+                   on d.aaa = s.aaa;
+
+
+
+CREATE TABLE dim_hget
+(
+
+    aaa varchar,
+    bbb varchar
+) WITH (
+      'connector' = 'redis-inlong',
+      'command' = 'hget',
+      'host' = 'redis_source',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000',
+      'additional.key' = '3'
+      );
+
+create table source_hget
+(
+    aaa varchar,
+    bbb varchar,
+    proctime as procTime()
+) with ('connector' = 'datagen', 'rows-per-second' = '1',
+      'fields.aaa.kind' = 'sequence', 'fields.aaa.start' = '1', 
'fields.aaa.end' = '2'
+      );
+CREATE TABLE sink_hget
+(
+
+    aaa varchar,
+    bbb varchar,
+    ccc varchar,
+    PRIMARY KEY (`aaa`) NOT ENFORCED
+) WITH (
+      'connector' = 'redis-inlong',
+      'sink.batch-size' = '1',
+      'format' = 'csv',
+      'data-type' = 'HASH',
+      'redis-mode' = 'standalone',
+      'host' = 'redis_sink',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000'
+      );
+
+insert into sink_hget
+select '3_3', d.aaa, d.bbb
+from source_hget s
+         left join dim_hget for system_time as of s.proctime as d
+                   on d.aaa = s.aaa and d.bbb = s.bbb;
+
+CREATE TABLE dim_zrevrank
+(
+    member_test varchar,
+    member_rank BIGINT
+) WITH (
+      'connector' = 'redis-inlong',
+      'command' = 'zrevrank',
+      'host' = 'redis_source',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000',
+      'additional.key' = 'rank'
+      );
+
+create table source_zrevrank
+(
+    member_test varchar,
+    proctime as procTime()
+) with ('connector' = 'datagen', 'rows-per-second' = '1',
+      'fields.member_test.kind' = 'sequence', 'fields.member_test.start' = 
'1', 'fields.member_test.end' = '3'
+      );
+CREATE TABLE sink_zrevrank
+(
+
+    aaa varchar,
+    bbb varchar,
+    ccc BIGINT,
+    PRIMARY KEY(`aaa`) NOT ENFORCED
+) WITH (
+      'connector' = 'redis-inlong',
+      'sink.batch-size' = '1',
+      'format' = 'csv',
+      'data-type' = 'HASH',
+      'redis-mode' = 'standalone',
+      'host' = 'redis_sink',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000'
+      );
+
+insert into sink_zrevrank
+select 'rank_test', s.member_test, d.member_rank
+from source_zrevrank s
+         left join dim_zrevrank for system_time as of s.proctime as d
+                   on d.member_test = s.member_test;
+
+
+
+CREATE TABLE dim_zscore
+(
+
+    member_test varchar,
+    score double
+) WITH (
+      'connector' = 'redis-inlong',
+      'command' = 'zscore',
+      'host' = 'redis_source',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000',
+      'additional.key' = 'rank_score'
+      );
+
+create table source_zscore
+(
+    member_test varchar,
+    proctime as procTime()
+) with ('connector' = 'datagen', 'rows-per-second' = '1',
+      'fields.member_test.kind' = 'sequence', 'fields.member_test.start' = 
'1', 'fields.member_test.end' = '3'
+      );
+CREATE TABLE sink_zscore
+(
+
+    aaa
+        varchar,
+    bbb
+        varchar,
+    ccc
+        double,
+    PRIMARY KEY(`aaa`) NOT ENFORCED
+) WITH (
+      'connector' = 'redis-inlong',
+      'sink.batch-size' = '1',
+      'format' = 'csv',
+      'data-type' = 'HASH',
+      'redis-mode' = 'standalone',
+      'host' = 'redis_sink',
+      'port' = '6379',
+      'maxIdle' = '8',
+      'minIdle' = '1',
+      'maxTotal' = '2',
+      'timeout' = '2000'
+      );
+
+insert into sink_zscore
+select 'rank_score_test', d.member_test, d.score
+from source_zscore s
+         left join dim_zscore for system_time as of s.proctime as d
+                   on d.member_test = s.member_test;
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties
index cc59d85482..8b0c655831 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/log4j2-test.properties
@@ -53,6 +53,12 @@ appender.postgres.fileName = target/logs/postgres.log
 appender.postgres.layout.type = PatternLayout
 appender.postgres.layout.pattern = - %m%n
 
+appender.redis.type = File
+appender.redis.name = redis
+appender.redis.fileName = target/logs/redis.log
+appender.redis.layout.type = PatternLayout
+appender.redis.layout.pattern = - %m%n
+
 logger.jm=INFO, jobmanager
 logger.jm.name=org.apache.flink.runtime.jobmaster.JobMaster
 logger.jm.additivity=false
@@ -62,11 +68,15 @@ 
logger.tm.name=org.apache.flink.runtime.taskexecutor.TaskExecutor
 logger.tm.additivity=false
 
 logger.starrocks=INFO, starrocks
-logger.starrocks.name=org.apache.inlong.sort.tests.StarRocksContainer
+logger.starrocks.name=org.apache.inlong.sort.tests.utils.StarRocksContainer
 logger.starrocks.additivity=false
 
 logger.postgres=INFO, postgres
 logger.postgres.name=org.testcontainers.containers.PostgreSQLContainer
 logger.postgres.additivity=false
 
+logger.redis=INFO, redis
+logger.redis.name=org.apache.inlong.sort.tests.utils.RedisContainer
+logger.redis.additivity=false
+
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml
index c37b5b17a6..651cb8e910 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/pom.xml
@@ -27,31 +27,24 @@
     </parent>
 
     <artifactId>sort-connector-redis-v1.15</artifactId>
+    <packaging>jar</packaging>
     <name>Apache InLong - Sort-connector-redis</name>
 
     <properties>
         
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
     </properties>
     <dependencies>
-        <!-- 
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-jackson -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>
             <version>${flink.shaded.jackson}</version>
+            <scope>test</scope>
         </dependency>
-
-        <!-- 
https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge 
-->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-api-java-bridge</artifactId>
             <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java</artifactId>
-            <version>${flink.version}</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.bahir</groupId>
@@ -99,7 +92,6 @@
             <version>${embedded.redis.version}</version>
             <scope>test</scope>
         </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
@@ -114,16 +106,9 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+            
<artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
-            <type>test-jar</type>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hbase</groupId>
-                    <artifactId>hbase-common</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -131,7 +116,6 @@
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-clients</artifactId>
@@ -149,6 +133,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
                 <executions>
                     <execution>
                         <id>shade-flink</id>
@@ -157,6 +142,22 @@
                         </goals>
                         <phase>package</phase>
                         <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                    
<include>io.streamnative.connectors:flink-protobuf</include>
+                                    
<include>org.apache.flink:flink-connector-base</include>
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                    
<include>org.apache.bahir:flink-connector-redis_${flink.scala.binary.version}</include>
+                                    
<include>org.apache.commons:commons-lang3</include>
+                                    
<include>org.apache.commons:commons-pool2</include>
+                                    <include>com.google.protobuf:*</include>
+                                    <include>javax.*:*</include>
+                                    <include>org.lz4*:*</include>
+                                    <include>org.slf4j:jul-to-slf4j</include>
+                                    <include>redis.clients:jedis</include>
+                                </includes>
+                            </artifactSet>
                             <filters>
                                 <filter>
                                     
<artifact>org.apache.inlong:sort-connector-*</artifact>
@@ -167,6 +168,28 @@
                                     </includes>
                                 </filter>
                             </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.commons.pool2</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.commons.pool2</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.commons.logging</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.commons.logging</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons.lang3</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.commons.lang3</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.redis.shaded.org.apache.inlong.sort.base</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.redis.shaded.com.google</shadedPattern>
+                                </relocation>
+                            </relocations>
                         </configuration>
                     </execution>
                 </executions>


Reply via email to