This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 07510ed937 [Fix][Connector-Redis] Redis did not write successfully, 
but the task did not fail (#9055)
07510ed937 is described below

commit 07510ed937a0dfa54f479bd1ab25c5f8f5cc6763
Author: limin <zmbw2...@163.com>
AuthorDate: Thu Mar 27 15:04:41 2025 +0800

    [Fix][Connector-Redis] Redis did not write successfully, but the task did 
not fail (#9055)
    
    Co-authored-by: limin <li...@wedobest.com.cn>
---
 .../seatunnel/redis/client/RedisSingleClient.java  |  54 ++++--
 .../seatunnel/redis/exception/RedisErrorCode.java  |   3 +-
 .../e2e/connector/redis/RedisMasterAndSlaveIT.java | 181 +++++++++++++++++++++
 .../fake-to-redis-test-readonly-hash.conf          |  67 ++++++++
 .../resources/fake-to-redis-test-readonly-key.conf |  66 ++++++++
 .../fake-to-redis-test-readonly-list.conf          |  66 ++++++++
 .../resources/fake-to-redis-test-readonly-set.conf |  66 ++++++++
 .../fake-to-redis-test-readonly-zset.conf          |  66 ++++++++
 8 files changed, 553 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index c9d3ba6788..c0b6572c7c 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -20,12 +20,15 @@ package 
org.apache.seatunnel.connectors.seatunnel.redis.client;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode;
 
 import org.apache.commons.collections4.CollectionUtils;
 
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.Pipeline;
 import redis.clients.jedis.Response;
+import redis.clients.jedis.exceptions.JedisException;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -142,6 +145,7 @@ public class RedisSingleClient extends RedisClient {
     @Override
     public void batchWriteString(
             List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
+        List<Response<?>> responses = new ArrayList<>();
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
@@ -149,20 +153,22 @@ public class RedisSingleClient extends RedisClient {
             String key = keys.get(i);
             String value = values.get(i);
             if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
-                pipelined.del(key);
+                responses.add(pipelined.del(key));
             } else {
-                pipelined.set(key, value);
+                responses.add(pipelined.set(key, value));
                 if (expireSeconds > 0) {
-                    pipelined.expire(key, expireSeconds);
+                    responses.add(pipelined.expire(key, expireSeconds));
                 }
             }
         }
         pipelined.sync();
+        processResponses(responses);
     }
 
     @Override
     public void batchWriteList(
             List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
+        List<Response<?>> responses = new ArrayList<>();
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
@@ -170,20 +176,22 @@ public class RedisSingleClient extends RedisClient {
             String key = keys.get(i);
             String value = values.get(i);
             if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
-                pipelined.lrem(key, 1, value);
+                responses.add(pipelined.lrem(key, 1, value));
             } else {
-                pipelined.lpush(key, value);
+                responses.add(pipelined.lpush(key, value));
                 if (expireSeconds > 0) {
-                    pipelined.expire(key, expireSeconds);
+                    responses.add(pipelined.expire(key, expireSeconds));
                 }
             }
         }
         pipelined.sync();
+        processResponses(responses);
     }
 
     @Override
     public void batchWriteSet(
             List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
+        List<Response<?>> responses = new ArrayList<>();
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
@@ -191,20 +199,22 @@ public class RedisSingleClient extends RedisClient {
             String key = keys.get(i);
             String value = values.get(i);
             if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
-                pipelined.srem(key, value);
+                responses.add(pipelined.srem(key, value));
             } else {
-                pipelined.sadd(key, value);
+                responses.add(pipelined.sadd(key, value));
                 if (expireSeconds > 0) {
-                    pipelined.expire(key, expireSeconds);
+                    responses.add(pipelined.expire(key, expireSeconds));
                 }
             }
         }
         pipelined.sync();
+        processResponses(responses);
     }
 
     @Override
     public void batchWriteHash(
             List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
+        List<Response<?>> responses = new ArrayList<>();
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
@@ -214,21 +224,23 @@ public class RedisSingleClient extends RedisClient {
             Map<String, String> fieldsMap = JsonUtils.toMap(value);
             if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
                 for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
-                    pipelined.hdel(key, entry.getKey());
+                    responses.add(pipelined.hdel(key, entry.getKey()));
                 }
             } else {
-                pipelined.hset(key, fieldsMap);
+                responses.add(pipelined.hset(key, fieldsMap));
                 if (expireSeconds > 0) {
-                    pipelined.expire(key, expireSeconds);
+                    responses.add(pipelined.expire(key, expireSeconds));
                 }
             }
         }
         pipelined.sync();
+        processResponses(responses);
     }
 
     @Override
     public void batchWriteZset(
             List<RowKind> rowKinds, List<String> keys, List<String> values, 
long expireSeconds) {
+        List<Response<?>> responses = new ArrayList<>();
         Pipeline pipelined = jedis.pipelined();
         int size = keys.size();
         for (int i = 0; i < size; i++) {
@@ -236,14 +248,26 @@ public class RedisSingleClient extends RedisClient {
             String key = keys.get(i);
             String value = values.get(i);
             if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) 
{
-                pipelined.zrem(key, value);
+                responses.add(pipelined.zrem(key, value));
             } else {
-                pipelined.zadd(key, 1, value);
+                responses.add(pipelined.zadd(key, 1, value));
                 if (expireSeconds > 0) {
-                    pipelined.expire(key, expireSeconds);
+                    responses.add(pipelined.expire(key, expireSeconds));
                 }
             }
         }
         pipelined.sync();
+        processResponses(responses);
+    }
+
+    private void processResponses(List<Response<?>> responseList) {
+        try {
+            for (Response<?> response : responseList) {
+                // If the response is an exception object, it will be thrown
+                response.get();
+            }
+        } catch (JedisException e) {
+            throw new 
RedisConnectorException(RedisErrorCode.GET_RESPONSE_FAILED, e);
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
index 4d9bb745bd..a82a1425de 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
@@ -20,7 +20,8 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 
 public enum RedisErrorCode implements SeaTunnelErrorCode {
     GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the 
redis version"),
-    INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config");
+    INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config"),
+    GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write 
response");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
new file mode 100644
index 0000000000..9c84bb3bf8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.Jedis;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisMasterAndSlaveIT extends TestSuiteBase implements 
TestResource {
+    private static RedisContainerInfo masterContainerInfo;
+    private static RedisContainerInfo slaveContainerInfo;
+    private static GenericContainer<?> master;
+    private static GenericContainer<?> slave;
+    private Jedis slaveJedis;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        masterContainerInfo =
+                new RedisContainerInfo("redis-e2e-master", 6379, "SeaTunnel", 
"redis:7");
+        master =
+                new 
GenericContainer<>(DockerImageName.parse(masterContainerInfo.getImageName()))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(masterContainerInfo.getHost())
+                        .withExposedPorts(masterContainerInfo.getPort())
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(
+                                                
masterContainerInfo.getImageName())))
+                        .withCommand(
+                                String.format(
+                                        "redis-server --requirepass %s",
+                                        masterContainerInfo.getPassword()))
+                        .waitingFor(
+                                new HostPortWaitStrategy()
+                                        
.withStartupTimeout(Duration.ofMinutes(2)));
+        master.start();
+        log.info("Redis master container started");
+
+        slaveContainerInfo =
+                new RedisContainerInfo("redis-e2e-slave", 6379, "SeaTunnel", 
"redis:7");
+        slave =
+                new 
GenericContainer<>(DockerImageName.parse(slaveContainerInfo.getImageName()))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(slaveContainerInfo.getHost())
+                        .withExposedPorts(slaveContainerInfo.getPort())
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(
+                                                
slaveContainerInfo.getImageName())))
+                        .withCommand(
+                                String.format(
+                                        "redis-server --requirepass %s 
--slaveof %s %s --masterauth %s",
+                                        slaveContainerInfo.getPassword(),
+                                        masterContainerInfo.getHost(),
+                                        masterContainerInfo.getPort(),
+                                        masterContainerInfo.getPassword()))
+                        .waitingFor(
+                                new HostPortWaitStrategy()
+                                        
.withStartupTimeout(Duration.ofMinutes(2)));
+        slave.start();
+        log.info("Redis slave container started");
+        Startables.deepStart(Stream.of(master, slave)).join();
+        this.initSlaveJedis();
+    }
+
+    private void initSlaveJedis() {
+        Jedis jedis = new Jedis(slave.getHost(), slave.getFirstMappedPort());
+        jedis.auth(slaveContainerInfo.getPassword());
+        jedis.ping();
+        this.slaveJedis = jedis;
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (Objects.nonNull(slaveJedis)) {
+            slaveJedis.close();
+        }
+
+        if (Objects.nonNull(slave)) {
+            slave.close();
+        }
+        if (Objects.nonNull(master)) {
+            master.close();
+        }
+    }
+
+    @TestTemplate
+    public void testWriteKeyToReadOnlyRedis(TestContainer container) {
+        try {
+            container.executeJob("/fake-to-redis-test-readonly-key.conf");
+        } catch (Exception e) {
+            String containerLogs = container.getServerLogs();
+            Assertions.assertTrue(
+                    
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+        }
+        Assertions.assertEquals(null, slaveJedis.get("key_check"));
+    }
+
+    @TestTemplate
+    public void testWriteListToReadOnlyRedis(TestContainer container) {
+        try {
+            container.executeJob("/fake-to-redis-test-readonly-list.conf");
+        } catch (Exception e) {
+            String containerLogs = container.getServerLogs();
+            Assertions.assertTrue(
+                    
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+        }
+        Assertions.assertEquals(0, slaveJedis.llen("list_check"));
+    }
+
+    @TestTemplate
+    public void testWriteSetToReadOnlyRedis(TestContainer container) {
+        try {
+            container.executeJob("/fake-to-redis-test-readonly-set.conf");
+        } catch (Exception e) {
+            String containerLogs = container.getServerLogs();
+            Assertions.assertTrue(
+                    
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+        }
+        Assertions.assertEquals(0, slaveJedis.scard("set_check"));
+    }
+
+    @TestTemplate
+    public void testWriteZSetToReadOnlyRedis(TestContainer container) {
+        try {
+            container.executeJob("/fake-to-redis-test-readonly-zset.conf");
+        } catch (Exception e) {
+            String containerLogs = container.getServerLogs();
+            Assertions.assertTrue(
+                    
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+        }
+        Assertions.assertEquals(0, slaveJedis.zcard("zset_check"));
+    }
+
+    @TestTemplate
+    public void testWriteHashToReadOnlyRedis(TestContainer container) {
+        try {
+            container.executeJob("/fake-to-redis-test-readonly-hash.conf");
+        } catch (Exception e) {
+            String containerLogs = container.getServerLogs();
+            Assertions.assertTrue(
+                    
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+        }
+        Assertions.assertEquals(0, slaveJedis.hlen("hash_check"));
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf
new file mode 100644
index 0000000000..bd429f0908
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e-slave"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "hash_check"
+    data_type = hash
+    hash_key_field = "id"
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf
new file mode 100644
index 0000000000..47d41e0764
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e-slave"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "key_check"
+    data_type = key
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf
new file mode 100644
index 0000000000..c3cceb88f4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e-slave"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "list_check"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf
new file mode 100644
index 0000000000..174301e823
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e-slave"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "set_check"
+    data_type = set
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf
new file mode 100644
index 0000000000..28a3c9d374
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e-slave"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "zset_check"
+    data_type = zset
+    batch_size = 33
+  }
+}
\ No newline at end of file

Reply via email to