This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 77338950f [INLONG-7242][Manager] Support register and manage the 
resource of Redis sink (#7243)
77338950f is described below

commit 77338950f861d947086da231f876f191a6fbb6e1
Author: feat <featzh...@outlook.com>
AuthorDate: Thu Feb 2 16:00:27 2023 +0800

    [INLONG-7242][Manager] Support register and manage the resource of Redis 
sink (#7243)
---
 .../inlong/manager/common/consts/DataNodeType.java |   1 +
 .../inlong/manager/common/consts/SinkType.java     |   1 +
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   3 +-
 .../inlong/manager/common/util/Preconditions.java  |  13 ++
 .../plugin/flink/enums/ConnectorJarType.java       |   1 +
 .../manager/pojo/node/redis/RedisDataNodeDTO.java  |  63 ++++++++
 .../manager/pojo/node/redis/RedisDataNodeInfo.java |  47 ++++++
 .../pojo/node/redis/RedisDataNodeRequest.java}     |  30 ++--
 .../manager/pojo/sink/redis/RedisClusterMode.java} |  32 +++--
 .../manager/pojo/sink/redis/RedisDataType.java}    |  34 +++--
 .../pojo/sink/redis/RedisSchemaMapMode.java}       |  19 +--
 .../inlong/manager/pojo/sink/redis/RedisSink.java  | 124 ++++++++++++++++
 .../manager/pojo/sink/redis/RedisSinkDTO.java      | 129 +++++++++++++++++
 .../manager/pojo/sink/redis/RedisSinkRequest.java  | 108 ++++++++++++++
 .../manager/pojo/sort/util/LoadNodeUtils.java      | 118 +++++++++++++++
 .../service/node/redis/RedisDataNodeOperator.java  |  84 +++++++++++
 .../resource/sink/redis/RedisResourceOperator.java |  48 +++++++
 .../service/sink/redis/RedisSinkOperator.java      | 158 +++++++++++++++++++++
 .../manager/service/sink/RedisSinkServiceTest.java |  95 +++++++++++++
 19 files changed, 1061 insertions(+), 47 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index a32adac37..757561d5b 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -30,5 +30,6 @@ public class DataNodeType {
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
     public static final String MYSQL = "MYSQL";
     public static final String STARROCKS = "STARROCKS";
+    public static final String REDIS = "REDIS";
 
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index a2da09886..c7e293ed6 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -38,4 +38,5 @@ public class SinkType {
     public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
     public static final String DORIS = "DORIS";
     public static final String STARROCKS = "STARROCKS";
+    public static final String REDIS = "REDIS";
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 9c532dddc..d690e3f5d 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -40,6 +40,8 @@ public enum ErrorCodeEnum {
     RECORD_NOT_FOUND(109, "The record does not exist"),
     USER_IS_NOT_MANAGER(110, "%s is not the manager, please contact %s"),
     RECORD_IN_USED(111, "The record is in use"),
+    IP_EMPTY(112, "The IP is is empty"),
+    PORT_EMPTY(113, "The PORT is is empty"),
 
     GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation 
authority"),
     GROUP_DUPLICATE(1002, "Inlong group already exists"),
@@ -143,7 +145,6 @@ public enum ErrorCodeEnum {
     CONSUME_PERMISSION_DENIED(3005, "No permission to access this inlong 
consume"),
 
     AUDIT_ID_TYPE_NOT_SUPPORTED(4001, "Audit id type '%s' not supported"),
-
     ;
 
     private final int code;
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
index b0326b18c..ce2ae4485 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
@@ -90,6 +90,14 @@ public class Preconditions {
         expectTrue(collection != null && !collection.isEmpty(), errMsg);
     }
 
+    public static void expectNotEmpty(String[] collection, String errMsg) {
+        expectTrue(collection != null && collection.length != 0, errMsg);
+    }
+
+    public static void expectNotEmpty(String[] collection, Supplier<String> 
errMsg) {
+        expectTrue(collection != null && collection.length != 0, errMsg);
+    }
+
     public static void expectNotEmpty(Map<?, ?> map, String errMsg) {
         expectTrue(map != null && !map.isEmpty(), errMsg);
     }
@@ -127,6 +135,11 @@ public class Preconditions {
             throw new IllegalArgumentException(errMsg);
         }
     }
+    public static void expectNotBlank(String obj, String errMsg) {
+        if (StringUtils.isBlank(obj)) {
+            throw new IllegalArgumentException(errMsg);
+        }
+    }
 
     public static void expectNotBlank(String obj, ErrorCodeEnum errorCodeEnum) 
{
         if (StringUtils.isBlank(obj)) {
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
index de2259f67..52a581a50 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -72,6 +72,7 @@ public enum ConnectorJarType {
     HUDI_SINK("hudiLoad", "hudi"),
 
     HDFS_SINK("fileSystemLoad", ""),
+    REDIS_SINK("redisLoad", "redis"),
 
     ;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java
new file mode 100644
index 000000000..1c6b64cd8
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Redis data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@ApiModel("Redis data node info")
+public class RedisDataNodeDTO {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisDataNodeDTO.class);
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static RedisDataNodeDTO getFromRequest(RedisDataNodeRequest 
request) throws Exception {
+        return RedisDataNodeDTO.builder()
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static RedisDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, RedisDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for Redis node: 
%s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java
new file mode 100644
index 000000000..3772f67d7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Redis data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.REDIS)
+@ApiModel("Redis data node info")
+public class RedisDataNodeInfo extends DataNodeInfo {
+
+    public RedisDataNodeInfo() {
+        this.setType(DataNodeType.REDIS);
+    }
+
+    @Override
+    public RedisDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, RedisDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java
similarity index 54%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java
index a32adac37..5d782c6f0 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java
@@ -15,20 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 
 /**
- * Constants of data node.
+ * Redis data node request
  */
-public class DataNodeType {
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.REDIS)
+@ApiModel("Redis data node request")
+public class RedisDataNodeRequest extends DataNodeRequest {
 
-    public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
-    public static final String HUDI = "HUDI";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String MYSQL = "MYSQL";
-    public static final String STARROCKS = "STARROCKS";
+    public RedisDataNodeRequest() {
+        this.setType(DataNodeType.REDIS);
+    }
 
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java
similarity index 57%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java
index a32adac37..f9d4cd413 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java
@@ -15,20 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.redis;
 
 /**
- * Constants of data node.
+ * The cluster mode of Redis.
  */
-public class DataNodeType {
+public enum RedisClusterMode {
 
-    public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
-    public static final String HUDI = "HUDI";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String MYSQL = "MYSQL";
-    public static final String STARROCKS = "STARROCKS";
+    STANDALONE("standalone"),
+    CLUSTER("cluster"),
+    SENTINEL("sentinel"),
+    ;
 
+    private final String key;
+
+    private RedisClusterMode(String key) {
+        this.key = key;
+    }
+
+    public static RedisClusterMode of(String key) {
+        for (RedisClusterMode redisClusterMode : RedisClusterMode.values()) {
+            if (key != null && redisClusterMode.key.equals(key.toLowerCase())) 
{
+                return redisClusterMode;
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java
similarity index 53%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java
index a32adac37..f74c92b56 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java
@@ -15,20 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import java.util.Arrays;
+import java.util.HashSet;
 
 /**
- * Constants of data node.
+ * The data type of Redis.
  */
-public class DataNodeType {
+public enum RedisDataType {
+
+    HASH(
+            RedisSchemaMapMode.DYNAMIC,
+            RedisSchemaMapMode.STATIC_KV_PAIR,
+            RedisSchemaMapMode.STATIC_PREFIX_MATCH),
+    BITMAP(
+            RedisSchemaMapMode.DYNAMIC),
+    PLAIN(
+            RedisSchemaMapMode.STATIC_PREFIX_MATCH);
+
+    private final HashSet<RedisSchemaMapMode> mapModes;
 
-    public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
-    public static final String HUDI = "HUDI";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String MYSQL = "MYSQL";
-    public static final String STARROCKS = "STARROCKS";
+    private RedisDataType(RedisSchemaMapMode... modes) {
+        this.mapModes = new HashSet<>(Arrays.asList(modes));
+    }
 
+    public HashSet<RedisSchemaMapMode> getMapModes() {
+        return mapModes;
+    }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java
similarity index 60%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java
index a32adac37..b449fd4f7 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java
@@ -15,20 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.redis;
 
 /**
- * Constants of data node.
+ * The mapMode between SQL column and Redis data-type.
  */
-public class DataNodeType {
-
-    public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
-    public static final String HUDI = "HUDI";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String MYSQL = "MYSQL";
-    public static final String STARROCKS = "STARROCKS";
-
+public enum RedisSchemaMapMode {
+    DYNAMIC,
+    STATIC_PREFIX_MATCH,
+    STATIC_KV_PAIR;
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java
new file mode 100644
index 000000000..ff19f870f
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Redis sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Redis sink info")
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisSink extends StreamSink {
+
+    @ApiModelProperty("Redis cluster mode")
+    private String clusterMode;
+
+    @ApiModelProperty("Redis database id")
+    private Integer database;
+
+    @ApiModelProperty("Redis data type")
+    private String dataType;
+
+    @ApiModelProperty("Redis schema mapping mode")
+    private String schemaMapMode;
+
+    @ApiModelProperty("Password for Redis accessing")
+    private String password;
+
+    @ApiModelProperty("Database name")
+    private String databaseName;
+
+    @ApiModelProperty("Expire time of Redis row")
+    private Integer ttl;
+
+    @ApiModelProperty("The timeout of Redis client")
+    private Integer timeout;
+
+    @ApiModelProperty("The socket timeout of redis client")
+    private Integer soTimeout;
+
+    @ApiModelProperty("The max total of sink client")
+    private Integer maxTotal;
+
+    @ApiModelProperty("The max idle of sink client")
+    private Integer maxIdle;
+
+    @ApiModelProperty("The min idle of sink client")
+    private Integer minIdle;
+
+    @ApiModelProperty("The max retry time")
+    private Integer maxRetries;
+
+    @ApiModelProperty("The host of Redis server")
+    private String host;
+
+    @ApiModelProperty("The port of Redis server")
+    private Integer port;
+
+    @ApiModelProperty("The master name of Redis sentinel cluster")
+    private String sentinelMasterName;
+
+    @ApiModelProperty("The sentinels info of Redis sentinel cluster")
+    private String sentinelsInfo;
+
+    /**
+     * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+     * If server is not cluster mode, server address format eg: 127.0.0.1:8080 
.
+     */
+    @ApiModelProperty("The cluster nodes of Redis cluster")
+    private String clusterNodes;
+
+    @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH 
data-type")
+    private String formatDataEncoding;
+
+    @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type")
+    private String formatDataType;
+
+    @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH 
data-type")
+    private Boolean formatIgnoreParseError;
+
+    @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH 
data-type")
+    private String formatDataSeparator;
+
+    public RedisSink() {
+        this.setSinkType(SinkType.REDIS);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, RedisSinkRequest::new);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
new file mode 100644
index 000000000..22888179d
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+import java.util.Map;
+
+/**
+ * Sink info of Redis
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class RedisSinkDTO {
+
+    @ApiModelProperty("Redis cluster mode")
+    private String clusterMode;
+
+    @ApiModelProperty("Redis database id")
+    private Integer database;
+
+    @ApiModelProperty("Redis data type")
+    private String dataType;
+
+    @ApiModelProperty("Redis schema mapping mode")
+    private String schemaMapMode;
+
+    @ApiModelProperty("Password for Redis accessing")
+    private String password;
+
+    @ApiModelProperty("Database name")
+    private String databaseName;
+
+    @ApiModelProperty("Expire time of Redis row")
+    private Integer ttl;
+
+    @ApiModelProperty("The timeout of Redis client")
+    private Integer timeout;
+
+    @ApiModelProperty("The socket timeout of redis client")
+    private Integer soTimeout;
+
+    @ApiModelProperty("The max total of sink client")
+    private Integer maxTotal;
+
+    @ApiModelProperty("The max idle of sink client")
+    private Integer maxIdle;
+
+    @ApiModelProperty("The min idle of sink client")
+    private Integer minIdle;
+
+    @ApiModelProperty("The max retry time")
+    private Integer maxRetries;
+
+    @ApiModelProperty("The host of Redis server")
+    private String host;
+
+    @ApiModelProperty("The port of Redis server")
+    private Integer port;
+
+    @ApiModelProperty("The master name of Redis sentinel cluster")
+    private String sentinelMasterName;
+
+    private String sentinelsInfo;
+
+    /**
+     * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+     * If server is not cluster mode, server address format eg: 127.0.0.1:8080 
.
+     */
+    @ApiModelProperty("The cluster nodes of Redis cluster")
+    private String clusterNodes;
+
+    @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH 
data-type")
+    private String formatDataEncoding;
+
+    @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type")
+    private String formatDataType;
+
+    @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH 
data-type")
+    private Boolean formatIgnoreParseError;
+
+    @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH 
data-type")
+    private String formatDataSeparator;
+    @ApiModelProperty("Properties for Redis")
+    private Map<String, Object> properties;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static RedisSinkDTO getFromRequest(RedisSinkRequest request) throws 
Exception {
+        return CommonBeanUtils.copyProperties(request, RedisSinkDTO::new);
+    }
+
+    public static RedisSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, RedisSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Redis SinkDTO failure: 
%s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java
new file mode 100644
index 000000000..0ea2e84c9
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Redis sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Redis sink request")
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("Redis cluster mode")
+    private String clusterMode;
+
+    @ApiModelProperty("Redis database id")
+    private Integer database;
+
+    @ApiModelProperty("Redis data type")
+    private String dataType;
+
+    @ApiModelProperty("Redis schema mapping mode")
+    private String schemaMapMode;
+
+    @ApiModelProperty("Password for Redis accessing")
+    private String password;
+
+    @ApiModelProperty("Database name")
+    private String databaseName;
+
+    @ApiModelProperty("Expire time of Redis row")
+    private Integer ttl;
+
+    @ApiModelProperty("The timeout of Redis client")
+    private Integer timeout;
+
+    @ApiModelProperty("The socket timeout of redis client")
+    private Integer soTimeout;
+
+    @ApiModelProperty("The max total of sink client")
+    private Integer maxTotal;
+
+    @ApiModelProperty("The max idle of sink client")
+    private Integer maxIdle;
+
+    @ApiModelProperty("The min idle of sink client")
+    private Integer minIdle;
+
+    @ApiModelProperty("The max retry time")
+    private Integer maxRetries;
+
+    @ApiModelProperty("The host of Redis server")
+    private String host;
+
+    @ApiModelProperty("The port of Redis server")
+    private Integer port;
+
+    @ApiModelProperty("The master name of Redis sentinel cluster")
+    private String sentinelMasterName;
+
+    @ApiModelProperty("The sentinels info of Redis sentinel cluster")
+    private String sentinelsInfo;
+
+    /**
+     * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+     * If server is not cluster mode, server address format eg: 127.0.0.1:8080 
.
+     */
+    @ApiModelProperty("The cluster nodes of Redis cluster")
+    private String clusterNodes;
+
+    @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH 
data-type")
+    private String formatDataEncoding;
+
+    @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type")
+    private String formatDataType;
+
+    @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH 
data-type")
+    private Boolean formatIgnoreParseError;
+
+    @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH 
data-type")
+    private String formatDataSeparator;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 1eb3e1398..2c49597c6 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -41,6 +41,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
 import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
 import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
 import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
 import org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
@@ -55,6 +56,7 @@ import 
org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
@@ -70,6 +72,7 @@ import 
org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
 import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
 import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
@@ -145,6 +148,8 @@ public class LoadNodeUtils {
                 return createLoadNode((DorisSink) streamSink, fieldInfos, 
fieldRelations, properties);
             case SinkType.STARROCKS:
                 return createLoadNode((StarRocksSink) streamSink, fieldInfos, 
fieldRelations, properties);
+            case SinkType.REDIS:
+                return createLoadNode((RedisSink) streamSink, fieldInfos, 
fieldRelations, properties);
             default:
                 throw new BusinessException(String.format("Unsupported 
sinkType=%s to create load node", sinkType));
         }
@@ -380,6 +385,64 @@ public class LoadNodeUtils {
                 starRocksSink.getTablePattern());
     }
 
+    private static LoadNode createLoadNode(
+            RedisSink redisSink,
+            List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations,
+            Map<String, String> properties) {
+        String clusterMode = redisSink.getClusterMode();
+        String dataType = redisSink.getDataType();
+        String schemaMapMode = redisSink.getSchemaMapMode();
+        String host = redisSink.getHost();
+        Integer port = redisSink.getPort();
+        String clusterNodes = redisSink.getClusterNodes();
+        String sentinelMasterName = redisSink.getSentinelMasterName();
+        String sentinelsInfo = redisSink.getSentinelsInfo();
+        Integer database = redisSink.getDatabase();
+        String password = redisSink.getPassword();
+        Integer ttl = redisSink.getTtl();
+        Integer timeout = redisSink.getTimeout();
+        Integer soTimeout = redisSink.getSoTimeout();
+        Integer maxTotal = redisSink.getMaxTotal();
+        Integer maxIdle = redisSink.getMaxIdle();
+        Integer minIdle = redisSink.getMinIdle();
+        Integer maxRetries = redisSink.getMaxRetries();
+
+        Format format = parsingFormat(
+                redisSink.getFormatDataType(),
+                false,
+                redisSink.getFormatDataSeparator(),
+                false);
+
+        return new RedisLoadNode(
+                redisSink.getSinkName(),
+                redisSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                clusterMode,
+                dataType,
+                schemaMapMode,
+                host,
+                port,
+                clusterNodes,
+                sentinelMasterName,
+                sentinelsInfo,
+                database,
+                password,
+                ttl,
+                format,
+                timeout,
+                soTimeout,
+                maxTotal,
+                maxIdle,
+                minIdle,
+                maxRetries);
+    }
+
     /**
      * Create load node of Iceberg.
      */
@@ -651,4 +714,59 @@ public class LoadNodeUtils {
         }
     }
 
+    /**
+     * Parse format
+     *
+     * @param formatName        data serialization, support: csv, json, canal, 
avro, etc
+     * @param wrapWithInlongMsg whether wrap content with {@link 
InLongMsgFormat}
+     * @param separatorStr      the separator of data content
+     * @param ignoreParseErrors whether ignore deserialization error data
+     * @return the format for serialized content
+     */
+    private static Format parsingFormat(
+            String formatName,
+            boolean wrapWithInlongMsg,
+            String separatorStr,
+            boolean ignoreParseErrors) {
+        Format format;
+        DataTypeEnum dataType = DataTypeEnum.forType(formatName);
+        switch (dataType) {
+            case CSV:
+                if (StringUtils.isNumeric(separatorStr)) {
+                    char dataSeparator = (char) Integer.parseInt(separatorStr);
+                    separatorStr = Character.toString(dataSeparator);
+                }
+                CsvFormat csvFormat = new CsvFormat(separatorStr);
+                csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = csvFormat;
+                break;
+            case AVRO:
+                format = new AvroFormat();
+                break;
+            case JSON:
+                JsonFormat jsonFormat = new JsonFormat();
+                jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = jsonFormat;
+                break;
+            case CANAL:
+                format = new CanalJsonFormat();
+                break;
+            case DEBEZIUM_JSON:
+                DebeziumJsonFormat debeziumJsonFormat = new 
DebeziumJsonFormat();
+                debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = debeziumJsonFormat;
+                break;
+            case RAW:
+                format = new RawFormat();
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported 
dataType=%s", dataType));
+        }
+        if (wrapWithInlongMsg) {
+            Format innerFormat = format;
+            format = new InLongMsgFormat(innerFormat, false);
+        }
+        return format;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java
new file mode 100644
index 000000000..94930b4cd
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.redis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class RedisDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.REDIS;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        RedisDataNodeInfo redisDataNodeInfo = new RedisDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, redisDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            RedisDataNodeDTO dto = 
RedisDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, redisDataNodeInfo);
+        }
+        return redisDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        RedisDataNodeRequest redisDataNodeRequest = (RedisDataNodeRequest) 
request;
+        CommonBeanUtils.copyProperties(redisDataNodeRequest, targetEntity, 
true);
+        try {
+            RedisDataNodeDTO dto = 
RedisDataNodeDTO.getFromRequest(redisDataNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("Failed to build extParams for Redis node: 
%s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java
new file mode 100644
index 000000000..264d8bf1d
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.redis;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Redis resource operator
+ */
+@Service
+public class RedisResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisResourceOperator.class);
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    /**
+     * Create Redis table according to the sink config
+     */
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOGGER.info("It is not need to create redis table!");
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
new file mode 100644
index 000000000..884fc64fd
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static 
org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static 
org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static 
org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static 
org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static 
org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,
+                "Redis ClusterMode must in one of " + 
Arrays.toString(RedisClusterMode.values()) + " !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = 
sinkRequest.getSentinelMasterName();
+                expectNotEmpty(sentinelMasterName, "Redis MasterName of 
Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel 
cluster must not null!");
+                break;
+            case STANDALONE:
+                String host = sinkRequest.getHost();
+                Integer port = sinkRequest.getPort();
+
+                expectNotEmpty(host, "Redis server host must not null!");
+                expectTrue(
+                        port != null && port > 1 && port < PORT_MAX_VALUE,
+                        "The port of the redis server must be greater than 0 
and less than 65535!");
+                break;
+        }
+        RedisDataType dataType = 
RedisDataType.valueOf(sinkRequest.getDataType());
+        expectNotNull(dataType, "Redis DataType must not null");
+
+        RedisSchemaMapMode mapMode = 
RedisSchemaMapMode.valueOf(sinkRequest.getSchemaMapMode());
+        expectTrue(dataType.getMapModes().contains(mapMode),
+                "Redis schemaMapMode '" + mapMode + "' is not supported in '" 
+ dataType + "'");
+
+        try {
+            RedisSinkDTO dto = RedisSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Redis SinkDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    private void checkClusterNodes(String clusterNodes) {
+        expectNotBlank(clusterNodes, "the nodes of Redis cluster must not 
null");
+        String[] nodeArray = clusterNodes.split(",");
+        expectNotEmpty(nodeArray, "the nodes of Redis cluster must not null");
+
+        for (String node : nodeArray) {
+            expectNotBlank(node, "Redis server host must not null!");
+            String[] ipPort = node.split(":");
+            expectTrue(ipPort.length == 2, "The ip and port of Redis server 
must be in form: ip:port");
+            expectNotBlank(ipPort[0], IP_EMPTY);
+            expectNotBlank(ipPort[1], PORT_EMPTY);
+        }
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        RedisSink sink = new RedisSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        RedisSinkDTO dto = RedisSinkDTO.getFromJson(entity.getExtParams());
+
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java
new file mode 100644
index 000000000..9f16c7370
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Redis stream sink service test.
+ */
+public class RedisSinkServiceTest extends ServiceBaseTest {
+
+    private final String globalGroupId = "b_group1";
+    private final String globalStreamId = "stream1_hudi";
+    private final String globalOperator = "admin";
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save sink info.
+     */
+    public Integer saveSink(String sinkName) {
+        streamServiceTest.saveInlongStream(globalGroupId, globalStreamId, 
globalOperator);
+        RedisSinkRequest sinkRequest = new RedisSinkRequest();
+        sinkRequest.setInlongGroupId(globalGroupId);
+        sinkRequest.setInlongStreamId(globalStreamId);
+        sinkRequest.setSinkType(SinkType.REDIS);
+        sinkRequest.setClusterMode(RedisClusterMode.STANDALONE.name());
+        sinkRequest.setHost("demo-host");
+        sinkRequest.setPort(6300);
+        sinkRequest.setDataType("HASH");
+        sinkRequest.setSchemaMapMode("DYNAMIC");
+        sinkRequest.setSinkName(sinkName);
+        sinkRequest.setId((int) (Math.random() * 100000 + 1));
+        return sinkService.save(sinkRequest, globalOperator);
+    }
+
+    @Test
+    public void testSaveAndDelete() {
+        Integer id = this.saveSink("default1");
+        Assertions.assertNotNull(id);
+        boolean result = sinkService.delete(id, false, globalOperator);
+        Assertions.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer id = this.saveSink("default2");
+        StreamSink sink = sinkService.get(id);
+        Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
+        sinkService.delete(id, false, globalOperator);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer sinkId = this.saveSink("default3");
+        StreamSink streamSink = sinkService.get(sinkId);
+        Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+
+        RedisSink sink = (RedisSink) streamSink;
+        SinkRequest request = sink.genSinkRequest();
+        boolean result = sinkService.update(request, globalOperator);
+        Assertions.assertTrue(result);
+
+        sinkService.delete(sinkId, false, globalOperator);
+    }
+
+}

Reply via email to