This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f1c313eea6 [improve] update Redis connector config option (#8631)
f1c313eea6 is described below

commit f1c313eea6b8635dd928d44a73c3d86fe40a4346
Author: Asish <labsa...@gmail.com>
AuthorDate: Tue Feb 11 19:52:15 2025 +0530

    [improve] update Redis connector config option (#8631)
    
    Co-authored-by: asishupadhyay <akula...@gmail.com>
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 .../{RedisConfig.java => RedisBaseOptions.java}    | 56 +++----------------
 .../seatunnel/redis/config/RedisParameters.java    | 62 +++++++++++-----------
 .../seatunnel/redis/config/RedisSinkOptions.java   | 54 +++++++++++++++++++
 .../seatunnel/redis/config/RedisSourceOptions.java | 35 ++++++++++++
 .../connectors/seatunnel/redis/sink/RedisSink.java |  4 +-
 .../seatunnel/redis/sink/RedisSinkFactory.java     | 33 +++++++-----
 .../seatunnel/redis/source/RedisSource.java        | 10 ++--
 .../seatunnel/redis/source/RedisSourceFactory.java | 28 +++++-----
 .../seatunnel/redis/source/RedisSourceReader.java  |  4 +-
 10 files changed, 171 insertions(+), 117 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index e11291c418..9cf980f4bd 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -185,7 +185,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("PaimonSinkOptions");
         whiteList.add("TDengineSourceOptions");
         whiteList.add("PulsarSourceOptions");
-        whiteList.add("RedisSinkOptions");
         whiteList.add("FakeSourceOptions");
         whiteList.add("HbaseSinkOptions");
         whiteList.add("MongodbSinkOptions");
@@ -230,7 +229,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("SocketSourceOptions");
         whiteList.add("OpenMldbSourceOptions");
         whiteList.add("Web3jSourceOptions");
-        whiteList.add("RedisSourceOptions");
         whiteList.add("PostgresIncrementalSourceOptions");
         whiteList.add("SqlServerIncrementalSourceOptions");
         whiteList.add("OracleIncrementalSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
similarity index 67%
rename from 
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
rename to 
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
index c9809868dc..892d110cc2 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
 
 import java.util.List;
 
-public class RedisConfig {
+public class RedisBaseOptions {
 
     public static final String CONNECTOR_IDENTITY = "Redis";
 
@@ -31,11 +31,6 @@ public class RedisConfig {
         CLUSTER;
     }
 
-    public enum HashKeyParseMode {
-        ALL,
-        KV;
-    }
-
     public static final Option<String> HOST =
             Options.key("host")
                     .stringType()
@@ -85,16 +80,16 @@ public class RedisConfig {
                     .noDefaultValue()
                     .withDescription("redis data types, support string hash 
list set zset.");
 
-    public static final Option<RedisConfig.Format> FORMAT =
+    public static final Option<RedisBaseOptions.Format> FORMAT =
             Options.key("format")
-                    .enumType(RedisConfig.Format.class)
-                    .defaultValue(RedisConfig.Format.JSON)
+                    .enumType(RedisBaseOptions.Format.class)
+                    .defaultValue(RedisBaseOptions.Format.JSON)
                     .withDescription(
                             "the format of upstream data, now only support 
json and text, default json.");
 
-    public static final Option<RedisConfig.RedisMode> MODE =
+    public static final Option<RedisBaseOptions.RedisMode> MODE =
             Options.key("mode")
-                    .enumType(RedisConfig.RedisMode.class)
+                    .enumType(RedisBaseOptions.RedisMode.class)
                     .defaultValue(RedisMode.SINGLE)
                     .withDescription(
                             "redis mode, support single or cluster, default 
value is single");
@@ -106,19 +101,6 @@ public class RedisConfig {
                     .withDescription(
                             "redis nodes information, used in cluster mode, 
must like as the following format: [host1:port1, host2:port2]");
 
-    public static final Option<RedisConfig.HashKeyParseMode> 
HASH_KEY_PARSE_MODE =
-            Options.key("hash_key_parse_mode")
-                    .enumType(RedisConfig.HashKeyParseMode.class)
-                    .defaultValue(HashKeyParseMode.ALL)
-                    .withDescription(
-                            "hash key parse mode, support all or kv, default 
value is all");
-
-    public static final Option<Long> EXPIRE =
-            Options.key("expire")
-                    .longType()
-                    .defaultValue(-1L)
-                    .withDescription("Set redis expiration time.");
-
     public static final Option<Integer> BATCH_SIZE =
             Options.key("batch_size")
                     .intType()
@@ -127,32 +109,6 @@ public class RedisConfig {
                             "batch_size is used to control the size of a batch 
of data during read and write operations"
                                     + ",default 10");
 
-    public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
-            Options.key("support_custom_key")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "if true, the key can be customized by the field 
value in the upstream data.");
-
-    public static final Option<String> VALUE_FIELD =
-            Options.key("value_field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The field of value you want to write to redis, 
support string list set zset");
-
-    public static final Option<String> HASH_KEY_FIELD =
-            Options.key("hash_key_field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The field of hash key you want to write 
to redis");
-
-    public static final Option<String> HASH_VALUE_FIELD =
-            Options.key("hash_value_field")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The field of hash value you want to 
write to redis");
-
     public enum Format {
         JSON,
         // TEXT will be supported later
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 6dff3cba71..4425be182f 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -52,11 +52,11 @@ public class RedisParameters implements Serializable {
     private String keysPattern;
     private String keyField;
     private RedisDataType redisDataType;
-    private RedisConfig.RedisMode mode;
-    private RedisConfig.HashKeyParseMode hashKeyParseMode;
+    private RedisBaseOptions.RedisMode mode;
+    private RedisSourceOptions.HashKeyParseMode hashKeyParseMode;
     private List<String> redisNodes = Collections.emptyList();
-    private long expire = RedisConfig.EXPIRE.defaultValue();
-    private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
+    private long expire = RedisSinkOptions.EXPIRE.defaultValue();
+    private int batchSize = RedisBaseOptions.BATCH_SIZE.defaultValue();
     private Boolean supportCustomKey;
     private String valueField;
     private String hashKeyField;
@@ -66,63 +66,63 @@ public class RedisParameters implements Serializable {
 
     public void buildWithConfig(ReadonlyConfig config) {
         // set host
-        this.host = config.get(RedisConfig.HOST);
+        this.host = config.get(RedisBaseOptions.HOST);
         // set port
-        this.port = config.get(RedisConfig.PORT);
+        this.port = config.get(RedisBaseOptions.PORT);
         // set db_num
-        this.dbNum = config.get(RedisConfig.DB_NUM);
+        this.dbNum = config.get(RedisBaseOptions.DB_NUM);
         // set hash key mode
-        this.hashKeyParseMode = config.get(RedisConfig.HASH_KEY_PARSE_MODE);
+        this.hashKeyParseMode = 
config.get(RedisSourceOptions.HASH_KEY_PARSE_MODE);
         // set expire
-        this.expire = config.get(RedisConfig.EXPIRE);
+        this.expire = config.get(RedisSinkOptions.EXPIRE);
         // set auth
-        if (config.getOptional(RedisConfig.AUTH).isPresent()) {
-            this.auth = config.get(RedisConfig.AUTH);
+        if (config.getOptional(RedisBaseOptions.AUTH).isPresent()) {
+            this.auth = config.get(RedisBaseOptions.AUTH);
         }
         // set user
-        if (config.getOptional(RedisConfig.USER).isPresent()) {
-            this.user = config.get(RedisConfig.USER);
+        if (config.getOptional(RedisBaseOptions.USER).isPresent()) {
+            this.user = config.get(RedisBaseOptions.USER);
         }
         // set mode
-        this.mode = config.get(RedisConfig.MODE);
+        this.mode = config.get(RedisBaseOptions.MODE);
         // set redis nodes information
-        if (config.getOptional(RedisConfig.NODES).isPresent()) {
-            this.redisNodes = config.get(RedisConfig.NODES);
+        if (config.getOptional(RedisBaseOptions.NODES).isPresent()) {
+            this.redisNodes = config.get(RedisBaseOptions.NODES);
         }
         // set key
-        if (config.getOptional(RedisConfig.KEY).isPresent()) {
-            this.keyField = config.get(RedisConfig.KEY);
+        if (config.getOptional(RedisBaseOptions.KEY).isPresent()) {
+            this.keyField = config.get(RedisBaseOptions.KEY);
         }
         // set keysPattern
-        if (config.getOptional(RedisConfig.KEY_PATTERN).isPresent()) {
-            this.keysPattern = config.get(RedisConfig.KEY_PATTERN);
+        if (config.getOptional(RedisBaseOptions.KEY_PATTERN).isPresent()) {
+            this.keysPattern = config.get(RedisBaseOptions.KEY_PATTERN);
         }
         // set redis data type verification factory createAndPrepareSource
-        this.redisDataType = config.get(RedisConfig.DATA_TYPE);
+        this.redisDataType = config.get(RedisBaseOptions.DATA_TYPE);
         // Indicates the number of keys to attempt to return per 
iteration.default 10
-        this.batchSize = config.get(RedisConfig.BATCH_SIZE);
+        this.batchSize = config.get(RedisBaseOptions.BATCH_SIZE);
         // set support custom key
-        if (config.getOptional(RedisConfig.SUPPORT_CUSTOM_KEY).isPresent()) {
-            this.supportCustomKey = config.get(RedisConfig.SUPPORT_CUSTOM_KEY);
+        if 
(config.getOptional(RedisSinkOptions.SUPPORT_CUSTOM_KEY).isPresent()) {
+            this.supportCustomKey = 
config.get(RedisSinkOptions.SUPPORT_CUSTOM_KEY);
         }
         // set value field
-        if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) {
-            this.valueField = config.get(RedisConfig.VALUE_FIELD);
+        if (config.getOptional(RedisSinkOptions.VALUE_FIELD).isPresent()) {
+            this.valueField = config.get(RedisSinkOptions.VALUE_FIELD);
         }
         // set hash key field
-        if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) {
-            this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD);
+        if (config.getOptional(RedisSinkOptions.HASH_KEY_FIELD).isPresent()) {
+            this.hashKeyField = config.get(RedisSinkOptions.HASH_KEY_FIELD);
         }
         // set hash value field
-        if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) {
-            this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD);
+        if (config.getOptional(RedisSinkOptions.HASH_VALUE_FIELD).isPresent()) 
{
+            this.hashValueField = 
config.get(RedisSinkOptions.HASH_VALUE_FIELD);
         }
     }
 
     public RedisClient buildRedisClient() {
         Jedis jedis = this.buildJedis();
         this.redisVersion = extractRedisVersion(jedis);
-        if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
+        if (mode.equals(RedisBaseOptions.RedisMode.SINGLE)) {
             return new RedisSingleClient(this, jedis, redisVersion);
         } else {
             return new RedisClusterClient(this, jedis, redisVersion);
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java
new file mode 100644
index 0000000000..960204321d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class RedisSinkOptions extends RedisBaseOptions {
+
+    public static final Option<Long> EXPIRE =
+            Options.key("expire")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription("Set redis expiration time.");
+
+    public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
+            Options.key("support_custom_key")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "if true, the key can be customized by the field 
value in the upstream data.");
+    public static final Option<String> VALUE_FIELD =
+            Options.key("value_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The field of value you want to write to redis, 
support string list set zset");
+    public static final Option<String> HASH_KEY_FIELD =
+            Options.key("hash_key_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The field of hash key you want to write 
to redis");
+
+    public static final Option<String> HASH_VALUE_FIELD =
+            Options.key("hash_value_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The field of hash value you want to 
write to redis");
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
new file mode 100644
index 0000000000..a02e113cea
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class RedisSourceOptions extends RedisBaseOptions {
+    public enum HashKeyParseMode {
+        ALL,
+        KV;
+    }
+
+    public static final Option<HashKeyParseMode> HASH_KEY_PARSE_MODE =
+            Options.key("hash_key_parse_mode")
+                    .enumType(HashKeyParseMode.class)
+                    .defaultValue(HashKeyParseMode.ALL)
+                    .withDescription(
+                            "hash key parse mode, support all or kv, default 
value is all");
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index ddb1901205..67774821ed 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 
 import java.io.IOException;
@@ -46,7 +46,7 @@ public class RedisSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
 
     @Override
     public String getPluginName() {
-        return RedisConfig.CONNECTOR_IDENTITY;
+        return RedisBaseOptions.CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index 38098a2560..9204f2c602 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -24,7 +24,8 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSinkOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -45,20 +46,26 @@ public class RedisSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(
-                        RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, 
RedisConfig.DATA_TYPE)
+                        RedisBaseOptions.HOST,
+                        RedisBaseOptions.PORT,
+                        RedisBaseOptions.KEY,
+                        RedisBaseOptions.DATA_TYPE)
                 .optional(
-                        RedisConfig.MODE,
-                        RedisConfig.AUTH,
-                        RedisConfig.USER,
-                        RedisConfig.KEY_PATTERN,
-                        RedisConfig.FORMAT,
-                        RedisConfig.EXPIRE,
-                        RedisConfig.SUPPORT_CUSTOM_KEY,
-                        RedisConfig.VALUE_FIELD,
-                        RedisConfig.HASH_KEY_FIELD,
-                        RedisConfig.HASH_VALUE_FIELD,
+                        RedisBaseOptions.MODE,
+                        RedisBaseOptions.AUTH,
+                        RedisBaseOptions.USER,
+                        RedisBaseOptions.KEY_PATTERN,
+                        RedisBaseOptions.FORMAT,
+                        RedisSinkOptions.EXPIRE,
+                        RedisSinkOptions.SUPPORT_CUSTOM_KEY,
+                        RedisSinkOptions.VALUE_FIELD,
+                        RedisSinkOptions.HASH_KEY_FIELD,
+                        RedisSinkOptions.HASH_VALUE_FIELD,
                         SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
-                .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
+                .conditional(
+                        RedisBaseOptions.MODE,
+                        RedisBaseOptions.RedisMode.CLUSTER,
+                        RedisBaseOptions.NODES)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 28f5693bb4..71bd16a6c7 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -32,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
@@ -48,7 +48,7 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public String getPluginName() {
-        return RedisConfig.CONNECTOR_IDENTITY;
+        return RedisBaseOptions.CONNECTOR_IDENTITY;
     }
 
     public RedisSource(ReadonlyConfig readonlyConfig) {
@@ -56,7 +56,7 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
         this.redisParameters.buildWithConfig(readonlyConfig);
         // TODO: use format SPI
         // default use json format
-        if (readonlyConfig.getOptional(RedisConfig.FORMAT).isPresent()) {
+        if (readonlyConfig.getOptional(RedisBaseOptions.FORMAT).isPresent()) {
             if 
(!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
                 throw new RedisConnectorException(
                         SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -67,8 +67,8 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
                                 "Must config schema when format parameter been 
config"));
             }
 
-            RedisConfig.Format format = readonlyConfig.get(RedisConfig.FORMAT);
-            if (RedisConfig.Format.JSON.equals(format)) {
+            RedisBaseOptions.Format format = 
readonlyConfig.get(RedisBaseOptions.FORMAT);
+            if (RedisBaseOptions.Format.JSON.equals(format)) {
                 this.catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
                 this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
                 this.deserializationSchema =
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
index c4f9ac099e..69ee36464b 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -25,7 +25,8 @@ import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -48,18 +49,21 @@ public class RedisSourceFactory implements 
TableSourceFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(
-                        RedisConfig.HOST,
-                        RedisConfig.PORT,
-                        RedisConfig.KEY_PATTERN,
-                        RedisConfig.DATA_TYPE)
+                        RedisBaseOptions.HOST,
+                        RedisBaseOptions.PORT,
+                        RedisBaseOptions.KEY_PATTERN,
+                        RedisBaseOptions.DATA_TYPE)
                 .optional(
-                        RedisConfig.MODE,
-                        RedisConfig.HASH_KEY_PARSE_MODE,
-                        RedisConfig.AUTH,
-                        RedisConfig.USER,
-                        RedisConfig.KEY)
-                .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
-                .bundled(RedisConfig.FORMAT, TableSchemaOptions.SCHEMA)
+                        RedisBaseOptions.MODE,
+                        RedisSourceOptions.HASH_KEY_PARSE_MODE,
+                        RedisBaseOptions.AUTH,
+                        RedisBaseOptions.USER,
+                        RedisBaseOptions.KEY)
+                .conditional(
+                        RedisBaseOptions.MODE,
+                        RedisBaseOptions.RedisMode.CLUSTER,
+                        RedisBaseOptions.NODES)
+                .bundled(RedisBaseOptions.FORMAT, TableSchemaOptions.SCHEMA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index be67c266f9..bdb887c097 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -25,9 +25,9 @@ import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -175,7 +175,7 @@ public class RedisSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
             return;
         }
         for (Map<String, String> recordsMap : values) {
-            if (redisParameters.getHashKeyParseMode() == 
RedisConfig.HashKeyParseMode.KV) {
+            if (redisParameters.getHashKeyParseMode() == 
RedisSourceOptions.HashKeyParseMode.KV) {
                 deserializationSchema.deserialize(
                         JsonUtils.toJsonString(recordsMap).getBytes(), output);
             } else {

Reply via email to