This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 07510ed937 [Fix][Connector-Redis] Redis did not write successfully, but the task did not fail (#9055) 07510ed937 is described below commit 07510ed937a0dfa54f479bd1ab25c5f8f5cc6763 Author: limin <zmbw2...@163.com> AuthorDate: Thu Mar 27 15:04:41 2025 +0800 [Fix][Connector-Redis] Redis did not write successfully, but the task did not fail (#9055) Co-authored-by: limin <li...@wedobest.com.cn> --- .../seatunnel/redis/client/RedisSingleClient.java | 54 ++++-- .../seatunnel/redis/exception/RedisErrorCode.java | 3 +- .../e2e/connector/redis/RedisMasterAndSlaveIT.java | 181 +++++++++++++++++++++ .../fake-to-redis-test-readonly-hash.conf | 67 ++++++++ .../resources/fake-to-redis-test-readonly-key.conf | 66 ++++++++ .../fake-to-redis-test-readonly-list.conf | 66 ++++++++ .../resources/fake-to-redis-test-readonly-set.conf | 66 ++++++++ .../fake-to-redis-test-readonly-zset.conf | 66 ++++++++ 8 files changed, 553 insertions(+), 16 deletions(-) diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java index c9d3ba6788..c0b6572c7c 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java @@ -20,12 +20,15 @@ package org.apache.seatunnel.connectors.seatunnel.redis.client; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; +import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; +import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode; import org.apache.commons.collections4.CollectionUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Response; +import redis.clients.jedis.exceptions.JedisException; import java.util.ArrayList; import java.util.List; @@ -142,6 +145,7 @@ public class RedisSingleClient extends RedisClient { @Override public void batchWriteString( List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) { + List<Response<?>> responses = new ArrayList<>(); Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { @@ -149,20 +153,22 @@ public class RedisSingleClient extends RedisClient { String key = keys.get(i); String value = values.get(i); if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { - pipelined.del(key); + responses.add(pipelined.del(key)); } else { - pipelined.set(key, value); + responses.add(pipelined.set(key, value)); if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + responses.add(pipelined.expire(key, expireSeconds)); } } } pipelined.sync(); + processResponses(responses); } @Override public void batchWriteList( List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) { + List<Response<?>> responses = new ArrayList<>(); Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { @@ -170,20 +176,22 @@ public class RedisSingleClient extends RedisClient { String key = keys.get(i); String value = values.get(i); if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { - pipelined.lrem(key, 1, value); + responses.add(pipelined.lrem(key, 1, value)); } else { - pipelined.lpush(key, value); + responses.add(pipelined.lpush(key, value)); if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + responses.add(pipelined.expire(key, expireSeconds)); } } } pipelined.sync(); + processResponses(responses); } @Override public void batchWriteSet( List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) { + List<Response<?>> responses = new ArrayList<>(); Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { @@ -191,20 +199,22 @@ public class RedisSingleClient extends RedisClient { String key = keys.get(i); String value = values.get(i); if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { - pipelined.srem(key, value); + responses.add(pipelined.srem(key, value)); } else { - pipelined.sadd(key, value); + responses.add(pipelined.sadd(key, value)); if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + responses.add(pipelined.expire(key, expireSeconds)); } } } pipelined.sync(); + processResponses(responses); } @Override public void batchWriteHash( List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) { + List<Response<?>> responses = new ArrayList<>(); Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { @@ -214,21 +224,23 @@ public class RedisSingleClient extends RedisClient { Map<String, String> fieldsMap = JsonUtils.toMap(value); if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { for (Map.Entry<String, String> entry : fieldsMap.entrySet()) { - pipelined.hdel(key, entry.getKey()); + responses.add(pipelined.hdel(key, entry.getKey())); } } else { - pipelined.hset(key, fieldsMap); + responses.add(pipelined.hset(key, fieldsMap)); if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + responses.add(pipelined.expire(key, expireSeconds)); } } } pipelined.sync(); + processResponses(responses); } @Override public void batchWriteZset( List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) { + List<Response<?>> responses = new ArrayList<>(); Pipeline pipelined = jedis.pipelined(); int size = keys.size(); for (int i = 0; i < size; i++) { @@ -236,14 +248,26 @@ public class RedisSingleClient extends RedisClient { String key = keys.get(i); String value = values.get(i); if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { - pipelined.zrem(key, value); + responses.add(pipelined.zrem(key, value)); } else { - pipelined.zadd(key, 1, value); + responses.add(pipelined.zadd(key, 1, value)); if (expireSeconds > 0) { - pipelined.expire(key, expireSeconds); + responses.add(pipelined.expire(key, expireSeconds)); } } } pipelined.sync(); + processResponses(responses); + } + + private void processResponses(List<Response<?>> responseList) { + try { + for (Response<?> response : responseList) { + // If the response is an exception object, it will be thrown + response.get(); + } + } catch (JedisException e) { + throw new RedisConnectorException(RedisErrorCode.GET_RESPONSE_FAILED, e); + } } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java index 4d9bb745bd..a82a1425de 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java @@ -20,7 +20,8 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; public enum RedisErrorCode implements SeaTunnelErrorCode { GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the redis version"), - INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config"); + INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config"), + GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write response"); private final String code; private final String description; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java new file mode 100644 index 0000000000..9c84bb3bf8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.e2e.connector.redis; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; +import redis.clients.jedis.Jedis; + +import java.time.Duration; +import java.util.Objects; +import java.util.stream.Stream; + +@Slf4j +public class RedisMasterAndSlaveIT extends TestSuiteBase implements TestResource { + private static RedisContainerInfo masterContainerInfo; + private static RedisContainerInfo slaveContainerInfo; + private static GenericContainer<?> master; + private static GenericContainer<?> slave; + private Jedis slaveJedis; + + @BeforeAll + @Override + public void startUp() throws Exception { + masterContainerInfo = + new RedisContainerInfo("redis-e2e-master", 6379, "SeaTunnel", "redis:7"); + master = + new GenericContainer<>(DockerImageName.parse(masterContainerInfo.getImageName())) + .withNetwork(NETWORK) + .withNetworkAliases(masterContainerInfo.getHost()) + .withExposedPorts(masterContainerInfo.getPort()) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger( + masterContainerInfo.getImageName()))) + .withCommand( + String.format( + "redis-server --requirepass %s", + masterContainerInfo.getPassword())) + .waitingFor( + new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + master.start(); + log.info("Redis master container started"); + + slaveContainerInfo = + new RedisContainerInfo("redis-e2e-slave", 6379, "SeaTunnel", "redis:7"); + slave = + new GenericContainer<>(DockerImageName.parse(slaveContainerInfo.getImageName())) + .withNetwork(NETWORK) + .withNetworkAliases(slaveContainerInfo.getHost()) + .withExposedPorts(slaveContainerInfo.getPort()) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger( + slaveContainerInfo.getImageName()))) + .withCommand( + String.format( + "redis-server --requirepass %s --slaveof %s %s --masterauth %s", + slaveContainerInfo.getPassword(), + masterContainerInfo.getHost(), + masterContainerInfo.getPort(), + masterContainerInfo.getPassword())) + .waitingFor( + new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + slave.start(); + log.info("Redis slave container started"); + Startables.deepStart(Stream.of(master, slave)).join(); + this.initSlaveJedis(); + } + + private void initSlaveJedis() { + Jedis jedis = new Jedis(slave.getHost(), slave.getFirstMappedPort()); + jedis.auth(slaveContainerInfo.getPassword()); + jedis.ping(); + this.slaveJedis = jedis; + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (Objects.nonNull(slaveJedis)) { + slaveJedis.close(); + } + + if (Objects.nonNull(slave)) { + slave.close(); + } + if (Objects.nonNull(master)) { + master.close(); + } + } + + @TestTemplate + public void testWriteKeyToReadOnlyRedis(TestContainer container) { + try { + container.executeJob("/fake-to-redis-test-readonly-key.conf"); + } catch (Exception e) { + String containerLogs = container.getServerLogs(); + Assertions.assertTrue( + containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException")); + } + Assertions.assertEquals(null, slaveJedis.get("key_check")); + } + + @TestTemplate + public void testWriteListToReadOnlyRedis(TestContainer container) { + try { + container.executeJob("/fake-to-redis-test-readonly-list.conf"); + } catch (Exception e) { + String containerLogs = container.getServerLogs(); + Assertions.assertTrue( + containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException")); + } + Assertions.assertEquals(0, slaveJedis.llen("list_check")); + } + + @TestTemplate + public void testWriteSetToReadOnlyRedis(TestContainer container) { + try { + container.executeJob("/fake-to-redis-test-readonly-set.conf"); + } catch (Exception e) { + String containerLogs = container.getServerLogs(); + Assertions.assertTrue( + containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException")); + } + Assertions.assertEquals(0, slaveJedis.scard("set_check")); + } + + @TestTemplate + public void testWriteZSetToReadOnlyRedis(TestContainer container) { + try { + container.executeJob("/fake-to-redis-test-readonly-zset.conf"); + } catch (Exception e) { + String containerLogs = container.getServerLogs(); + Assertions.assertTrue( + containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException")); + } + Assertions.assertEquals(0, slaveJedis.zcard("zset_check")); + } + + @TestTemplate + public void testWriteHashToReadOnlyRedis(TestContainer container) { + try { + container.executeJob("/fake-to-redis-test-readonly-hash.conf"); + } catch (Exception e) { + String containerLogs = container.getServerLogs(); + Assertions.assertTrue( + containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException")); + } + Assertions.assertEquals(0, slaveJedis.hlen("hash_check")); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf new file mode 100644 index 0000000000..bd429f0908 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e-slave" + port = 6379 + auth = "U2VhVHVubmVs" + key = "hash_check" + data_type = hash + hash_key_field = "id" + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf new file mode 100644 index 0000000000..47d41e0764 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e-slave" + port = 6379 + auth = "U2VhVHVubmVs" + key = "key_check" + data_type = key + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf new file mode 100644 index 0000000000..c3cceb88f4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e-slave" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list_check" + data_type = list + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf new file mode 100644 index 0000000000..174301e823 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e-slave" + port = 6379 + auth = "U2VhVHVubmVs" + key = "set_check" + data_type = set + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf new file mode 100644 index 0000000000..28a3c9d374 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } +} + +sink { + Redis { + host = "redis-e2e-slave" + port = 6379 + auth = "U2VhVHVubmVs" + key = "zset_check" + data_type = zset + batch_size = 33 + } +} \ No newline at end of file