This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new f1c313eea6 [improve] update Redis connector config option (#8631) f1c313eea6 is described below commit f1c313eea6b8635dd928d44a73c3d86fe40a4346 Author: Asish <labsa...@gmail.com> AuthorDate: Tue Feb 11 19:52:15 2025 +0530 [improve] update Redis connector config option (#8631) Co-authored-by: asishupadhyay <akula...@gmail.com> --- .../seatunnel/api/ConnectorOptionCheckTest.java | 2 - .../{RedisConfig.java => RedisBaseOptions.java} | 56 +++---------------- .../seatunnel/redis/config/RedisParameters.java | 62 +++++++++++----------- .../seatunnel/redis/config/RedisSinkOptions.java | 54 +++++++++++++++++++ .../seatunnel/redis/config/RedisSourceOptions.java | 35 ++++++++++++ .../connectors/seatunnel/redis/sink/RedisSink.java | 4 +- .../seatunnel/redis/sink/RedisSinkFactory.java | 33 +++++++----- .../seatunnel/redis/source/RedisSource.java | 10 ++-- .../seatunnel/redis/source/RedisSourceFactory.java | 28 +++++----- .../seatunnel/redis/source/RedisSourceReader.java | 4 +- 10 files changed, 171 insertions(+), 117 deletions(-) diff --git a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java index e11291c418..9cf980f4bd 100644 --- a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java +++ b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java @@ -185,7 +185,6 @@ public class ConnectorOptionCheckTest { whiteList.add("PaimonSinkOptions"); whiteList.add("TDengineSourceOptions"); whiteList.add("PulsarSourceOptions"); - whiteList.add("RedisSinkOptions"); whiteList.add("FakeSourceOptions"); whiteList.add("HbaseSinkOptions"); whiteList.add("MongodbSinkOptions"); @@ -230,7 +229,6 @@ public class ConnectorOptionCheckTest { whiteList.add("SocketSourceOptions"); whiteList.add("OpenMldbSourceOptions"); whiteList.add("Web3jSourceOptions"); - whiteList.add("RedisSourceOptions"); whiteList.add("PostgresIncrementalSourceOptions"); whiteList.add("SqlServerIncrementalSourceOptions"); whiteList.add("OracleIncrementalSourceOptions"); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java similarity index 67% rename from seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java rename to seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java index c9809868dc..892d110cc2 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options; import java.util.List; -public class RedisConfig { +public class RedisBaseOptions { public static final String CONNECTOR_IDENTITY = "Redis"; @@ -31,11 +31,6 @@ public class RedisConfig { CLUSTER; } - public enum HashKeyParseMode { - ALL, - KV; - } - public static final Option<String> HOST = Options.key("host") .stringType() @@ -85,16 +80,16 @@ public class RedisConfig { .noDefaultValue() .withDescription("redis data types, support string hash list set zset."); - public static final Option<RedisConfig.Format> FORMAT = + public static final Option<RedisBaseOptions.Format> FORMAT = Options.key("format") - .enumType(RedisConfig.Format.class) - .defaultValue(RedisConfig.Format.JSON) + .enumType(RedisBaseOptions.Format.class) + .defaultValue(RedisBaseOptions.Format.JSON) .withDescription( "the format of upstream data, now only support json and text, default json."); - public static final Option<RedisConfig.RedisMode> MODE = + public static final Option<RedisBaseOptions.RedisMode> MODE = Options.key("mode") - .enumType(RedisConfig.RedisMode.class) + .enumType(RedisBaseOptions.RedisMode.class) .defaultValue(RedisMode.SINGLE) .withDescription( "redis mode, support single or cluster, default value is single"); @@ -106,19 +101,6 @@ public class RedisConfig { .withDescription( "redis nodes information, used in cluster mode, must like as the following format: [host1:port1, host2:port2]"); - public static final Option<RedisConfig.HashKeyParseMode> HASH_KEY_PARSE_MODE = - Options.key("hash_key_parse_mode") - .enumType(RedisConfig.HashKeyParseMode.class) - .defaultValue(HashKeyParseMode.ALL) - .withDescription( - "hash key parse mode, support all or kv, default value is all"); - - public static final Option<Long> EXPIRE = - Options.key("expire") - .longType() - .defaultValue(-1L) - .withDescription("Set redis expiration time."); - public static final Option<Integer> BATCH_SIZE = Options.key("batch_size") .intType() @@ -127,32 +109,6 @@ public class RedisConfig { "batch_size is used to control the size of a batch of data during read and write operations" + ",default 10"); - public static final Option<Boolean> SUPPORT_CUSTOM_KEY = - Options.key("support_custom_key") - .booleanType() - .defaultValue(false) - .withDescription( - "if true, the key can be customized by the field value in the upstream data."); - - public static final Option<String> VALUE_FIELD = - Options.key("value_field") - .stringType() - .noDefaultValue() - .withDescription( - "The field of value you want to write to redis, support string list set zset"); - - public static final Option<String> HASH_KEY_FIELD = - Options.key("hash_key_field") - .stringType() - .noDefaultValue() - .withDescription("The field of hash key you want to write to redis"); - - public static final Option<String> HASH_VALUE_FIELD = - Options.key("hash_value_field") - .stringType() - .noDefaultValue() - .withDescription("The field of hash value you want to write to redis"); - public enum Format { JSON, // TEXT will be supported later diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index 6dff3cba71..4425be182f 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -52,11 +52,11 @@ public class RedisParameters implements Serializable { private String keysPattern; private String keyField; private RedisDataType redisDataType; - private RedisConfig.RedisMode mode; - private RedisConfig.HashKeyParseMode hashKeyParseMode; + private RedisBaseOptions.RedisMode mode; + private RedisSourceOptions.HashKeyParseMode hashKeyParseMode; private List<String> redisNodes = Collections.emptyList(); - private long expire = RedisConfig.EXPIRE.defaultValue(); - private int batchSize = RedisConfig.BATCH_SIZE.defaultValue(); + private long expire = RedisSinkOptions.EXPIRE.defaultValue(); + private int batchSize = RedisBaseOptions.BATCH_SIZE.defaultValue(); private Boolean supportCustomKey; private String valueField; private String hashKeyField; @@ -66,63 +66,63 @@ public class RedisParameters implements Serializable { public void buildWithConfig(ReadonlyConfig config) { // set host - this.host = config.get(RedisConfig.HOST); + this.host = config.get(RedisBaseOptions.HOST); // set port - this.port = config.get(RedisConfig.PORT); + this.port = config.get(RedisBaseOptions.PORT); // set db_num - this.dbNum = config.get(RedisConfig.DB_NUM); + this.dbNum = config.get(RedisBaseOptions.DB_NUM); // set hash key mode - this.hashKeyParseMode = config.get(RedisConfig.HASH_KEY_PARSE_MODE); + this.hashKeyParseMode = config.get(RedisSourceOptions.HASH_KEY_PARSE_MODE); // set expire - this.expire = config.get(RedisConfig.EXPIRE); + this.expire = config.get(RedisSinkOptions.EXPIRE); // set auth - if (config.getOptional(RedisConfig.AUTH).isPresent()) { - this.auth = config.get(RedisConfig.AUTH); + if (config.getOptional(RedisBaseOptions.AUTH).isPresent()) { + this.auth = config.get(RedisBaseOptions.AUTH); } // set user - if (config.getOptional(RedisConfig.USER).isPresent()) { - this.user = config.get(RedisConfig.USER); + if (config.getOptional(RedisBaseOptions.USER).isPresent()) { + this.user = config.get(RedisBaseOptions.USER); } // set mode - this.mode = config.get(RedisConfig.MODE); + this.mode = config.get(RedisBaseOptions.MODE); // set redis nodes information - if (config.getOptional(RedisConfig.NODES).isPresent()) { - this.redisNodes = config.get(RedisConfig.NODES); + if (config.getOptional(RedisBaseOptions.NODES).isPresent()) { + this.redisNodes = config.get(RedisBaseOptions.NODES); } // set key - if (config.getOptional(RedisConfig.KEY).isPresent()) { - this.keyField = config.get(RedisConfig.KEY); + if (config.getOptional(RedisBaseOptions.KEY).isPresent()) { + this.keyField = config.get(RedisBaseOptions.KEY); } // set keysPattern - if (config.getOptional(RedisConfig.KEY_PATTERN).isPresent()) { - this.keysPattern = config.get(RedisConfig.KEY_PATTERN); + if (config.getOptional(RedisBaseOptions.KEY_PATTERN).isPresent()) { + this.keysPattern = config.get(RedisBaseOptions.KEY_PATTERN); } // set redis data type verification factory createAndPrepareSource - this.redisDataType = config.get(RedisConfig.DATA_TYPE); + this.redisDataType = config.get(RedisBaseOptions.DATA_TYPE); // Indicates the number of keys to attempt to return per iteration.default 10 - this.batchSize = config.get(RedisConfig.BATCH_SIZE); + this.batchSize = config.get(RedisBaseOptions.BATCH_SIZE); // set support custom key - if (config.getOptional(RedisConfig.SUPPORT_CUSTOM_KEY).isPresent()) { - this.supportCustomKey = config.get(RedisConfig.SUPPORT_CUSTOM_KEY); + if (config.getOptional(RedisSinkOptions.SUPPORT_CUSTOM_KEY).isPresent()) { + this.supportCustomKey = config.get(RedisSinkOptions.SUPPORT_CUSTOM_KEY); } // set value field - if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) { - this.valueField = config.get(RedisConfig.VALUE_FIELD); + if (config.getOptional(RedisSinkOptions.VALUE_FIELD).isPresent()) { + this.valueField = config.get(RedisSinkOptions.VALUE_FIELD); } // set hash key field - if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) { - this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD); + if (config.getOptional(RedisSinkOptions.HASH_KEY_FIELD).isPresent()) { + this.hashKeyField = config.get(RedisSinkOptions.HASH_KEY_FIELD); } // set hash value field - if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) { - this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD); + if (config.getOptional(RedisSinkOptions.HASH_VALUE_FIELD).isPresent()) { + this.hashValueField = config.get(RedisSinkOptions.HASH_VALUE_FIELD); } } public RedisClient buildRedisClient() { Jedis jedis = this.buildJedis(); this.redisVersion = extractRedisVersion(jedis); - if (mode.equals(RedisConfig.RedisMode.SINGLE)) { + if (mode.equals(RedisBaseOptions.RedisMode.SINGLE)) { return new RedisSingleClient(this, jedis, redisVersion); } else { return new RedisClusterClient(this, jedis, redisVersion); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java new file mode 100644 index 0000000000..960204321d --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redis.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class RedisSinkOptions extends RedisBaseOptions { + + public static final Option<Long> EXPIRE = + Options.key("expire") + .longType() + .defaultValue(-1L) + .withDescription("Set redis expiration time."); + + public static final Option<Boolean> SUPPORT_CUSTOM_KEY = + Options.key("support_custom_key") + .booleanType() + .defaultValue(false) + .withDescription( + "if true, the key can be customized by the field value in the upstream data."); + public static final Option<String> VALUE_FIELD = + Options.key("value_field") + .stringType() + .noDefaultValue() + .withDescription( + "The field of value you want to write to redis, support string list set zset"); + public static final Option<String> HASH_KEY_FIELD = + Options.key("hash_key_field") + .stringType() + .noDefaultValue() + .withDescription("The field of hash key you want to write to redis"); + + public static final Option<String> HASH_VALUE_FIELD = + Options.key("hash_value_field") + .stringType() + .noDefaultValue() + .withDescription("The field of hash value you want to write to redis"); +} diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java new file mode 100644 index 0000000000..a02e113cea --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redis.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class RedisSourceOptions extends RedisBaseOptions { + public enum HashKeyParseMode { + ALL, + KV; + } + + public static final Option<HashKeyParseMode> HASH_KEY_PARSE_MODE = + Options.key("hash_key_parse_mode") + .enumType(HashKeyParseMode.class) + .defaultValue(HashKeyParseMode.ALL) + .withDescription( + "hash key parse mode, support all or kv, default value is all"); +} diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java index ddb1901205..67774821ed 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java @@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; import java.io.IOException; @@ -46,7 +46,7 @@ public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void> @Override public String getPluginName() { - return RedisConfig.CONNECTOR_IDENTITY; + return RedisBaseOptions.CONNECTOR_IDENTITY; } @Override diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java index 38098a2560..9204f2c602 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java @@ -24,7 +24,8 @@ import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; -import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSinkOptions; import com.google.auto.service.AutoService; @@ -45,20 +46,26 @@ public class RedisSinkFactory implements TableSinkFactory { public OptionRule optionRule() { return OptionRule.builder() .required( - RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, RedisConfig.DATA_TYPE) + RedisBaseOptions.HOST, + RedisBaseOptions.PORT, + RedisBaseOptions.KEY, + RedisBaseOptions.DATA_TYPE) .optional( - RedisConfig.MODE, - RedisConfig.AUTH, - RedisConfig.USER, - RedisConfig.KEY_PATTERN, - RedisConfig.FORMAT, - RedisConfig.EXPIRE, - RedisConfig.SUPPORT_CUSTOM_KEY, - RedisConfig.VALUE_FIELD, - RedisConfig.HASH_KEY_FIELD, - RedisConfig.HASH_VALUE_FIELD, + RedisBaseOptions.MODE, + RedisBaseOptions.AUTH, + RedisBaseOptions.USER, + RedisBaseOptions.KEY_PATTERN, + RedisBaseOptions.FORMAT, + RedisSinkOptions.EXPIRE, + RedisSinkOptions.SUPPORT_CUSTOM_KEY, + RedisSinkOptions.VALUE_FIELD, + RedisSinkOptions.HASH_KEY_FIELD, + RedisSinkOptions.HASH_VALUE_FIELD, SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) - .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES) + .conditional( + RedisBaseOptions.MODE, + RedisBaseOptions.RedisMode.CLUSTER, + RedisBaseOptions.NODES) .build(); } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java index 28f5693bb4..71bd16a6c7 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java @@ -32,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; -import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; import org.apache.seatunnel.format.json.JsonDeserializationSchema; @@ -48,7 +48,7 @@ public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> { @Override public String getPluginName() { - return RedisConfig.CONNECTOR_IDENTITY; + return RedisBaseOptions.CONNECTOR_IDENTITY; } public RedisSource(ReadonlyConfig readonlyConfig) { @@ -56,7 +56,7 @@ public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> { this.redisParameters.buildWithConfig(readonlyConfig); // TODO: use format SPI // default use json format - if (readonlyConfig.getOptional(RedisConfig.FORMAT).isPresent()) { + if (readonlyConfig.getOptional(RedisBaseOptions.FORMAT).isPresent()) { if (!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { throw new RedisConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, @@ -67,8 +67,8 @@ public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> { "Must config schema when format parameter been config")); } - RedisConfig.Format format = readonlyConfig.get(RedisConfig.FORMAT); - if (RedisConfig.Format.JSON.equals(format)) { + RedisBaseOptions.Format format = readonlyConfig.get(RedisBaseOptions.FORMAT); + if (RedisBaseOptions.Format.JSON.equals(format)) { this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig); this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); this.deserializationSchema = diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java index c4f9ac099e..69ee36464b 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java @@ -25,7 +25,8 @@ import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; -import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions; import com.google.auto.service.AutoService; @@ -48,18 +49,21 @@ public class RedisSourceFactory implements TableSourceFactory { public OptionRule optionRule() { return OptionRule.builder() .required( - RedisConfig.HOST, - RedisConfig.PORT, - RedisConfig.KEY_PATTERN, - RedisConfig.DATA_TYPE) + RedisBaseOptions.HOST, + RedisBaseOptions.PORT, + RedisBaseOptions.KEY_PATTERN, + RedisBaseOptions.DATA_TYPE) .optional( - RedisConfig.MODE, - RedisConfig.HASH_KEY_PARSE_MODE, - RedisConfig.AUTH, - RedisConfig.USER, - RedisConfig.KEY) - .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES) - .bundled(RedisConfig.FORMAT, TableSchemaOptions.SCHEMA) + RedisBaseOptions.MODE, + RedisSourceOptions.HASH_KEY_PARSE_MODE, + RedisBaseOptions.AUTH, + RedisBaseOptions.USER, + RedisBaseOptions.KEY) + .conditional( + RedisBaseOptions.MODE, + RedisBaseOptions.RedisMode.CLUSTER, + RedisBaseOptions.NODES) + .bundled(RedisBaseOptions.FORMAT, TableSchemaOptions.SCHEMA) .build(); } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java index be67c266f9..bdb887c097 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java @@ -25,9 +25,9 @@ import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient; -import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions; import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; import org.apache.commons.collections4.CollectionUtils; @@ -175,7 +175,7 @@ public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> { return; } for (Map<String, String> recordsMap : values) { - if (redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV) { + if (redisParameters.getHashKeyParseMode() == RedisSourceOptions.HashKeyParseMode.KV) { deserializationSchema.deserialize( JsonUtils.toJsonString(recordsMap).getBytes(), output); } else {