This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fed89ae3fc [Improve][Connector-V2]Support multi-table sink feature for
redis (#6314)
fed89ae3fc is described below
commit fed89ae3fcbf31708051db38d0b1253567bf923f
Author: lizhenglei <[email protected]>
AuthorDate: Fri Feb 23 11:16:23 2024 +0800
[Improve][Connector-V2]Support multi-table sink feature for redis (#6314)
---
.../connectors/seatunnel/redis/sink/RedisSink.java | 4 +-
.../seatunnel/redis/sink/RedisSinkWriter.java | 4 +-
.../seatunnel/e2e/connector/redis/RedisIT.java | 20 ++++-
.../resources/fake-to-multipletableredissink.conf | 97 ++++++++++++++++++++++
4 files changed, 122 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index ac8c544703..7e6d23dbec 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -29,7 +30,8 @@ import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import java.io.IOException;
-public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
private final RedisParameters redisParameters = new RedisParameters();
private SeaTunnelRowType seaTunnelRowType;
private ReadonlyConfig readonlyConfig;
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 80b1449b9d..23eda57202 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.sink;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
@@ -32,7 +33,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
private final SeaTunnelRowType seaTunnelRowType;
private final RedisParameters redisParameters;
private final SerializationSchema serializationSchema;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 2a2feb7744..7b03818c0b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -28,7 +28,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.junit.jupiter.api.AfterAll;
@@ -212,7 +214,7 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
}
@TestTemplate
- public void restRedisDbNum(TestContainer container) throws IOException,
InterruptedException {
+ public void testRedisDbNum(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult execResult =
container.executeJob("/redis-to-redis-by-db-num.conf");
Assertions.assertEquals(0, execResult.getExitCode());
jedis.select(2);
@@ -220,4 +222,20 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
jedis.del("db_test");
jedis.select(0);
}
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK/FLINK do not support multiple
table read")
+ public void testMultipletableRedisSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/fake-to-multipletableredissink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ jedis.select(3);
+ Assertions.assertEquals(2, jedis.llen("key_multi_list"));
+ jedis.del("key_multi_list");
+ jedis.select(0);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-multipletableredissink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-multipletableredissink.conf
new file mode 100644
index 0000000000..051c51eea6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-multipletableredissink.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "redis_sink_1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "lzl",
"2020-02-02T02:02:02"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "redis_sink_2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "lzl",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ db_num=3
+ auth = "U2VhVHVubmVs"
+ key = "key_multi_list"
+ data_type = list
+ }
+}