This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c39a3d99 [INLONG-9211][Manager] Fix failure to obtain data from 
Redis sink (#9212)
78c39a3d99 is described below

commit 78c39a3d9940c88fd1656721916fc30ac2cfd263
Author: haifxu <xhf1208357...@gmail.com>
AuthorDate: Mon Nov 6 22:05:38 2023 +0800

    [INLONG-9211][Manager] Fix failure to obtain data from Redis sink (#9212)
---
 .../postgresql/PostgreSQLResourceOperator.java     |  2 +-
 .../service/sink/redis/RedisSinkOperator.java      | 45 ++++++++++++++++++----
 2 files changed, 38 insertions(+), 9 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java
index 7d6cbf42df..379dad813b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java
@@ -100,7 +100,7 @@ public class PostgreSQLResourceOperator implements 
SinkResourceOperator {
         try (Connection conn = 
PostgreSQLJdbcUtils.getConnection(postgreSQLSink.getJdbcUrl(),
                 postgreSQLSink.getUsername(), postgreSQLSink.getPassword())) {
             // 1.If schema not exists,create it
-            PostgreSQLJdbcUtils.createSchema(conn, tableInfo.getTableName(), 
tableInfo.getUserName());
+            PostgreSQLJdbcUtils.createSchema(conn, tableInfo.getSchemaName(), 
tableInfo.getUserName());
             // 2.If table not exists, create it
             PostgreSQLJdbcUtils.createTable(conn, tableInfo);
             // 3.Table exists, add columns - skip the exists columns
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
index e22570942b..0a31f0a93c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -34,6 +35,7 @@ import 
org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -83,22 +85,26 @@ public class RedisSinkOperator extends AbstractSinkOperator 
{
 
         RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
 
-        String clusterMode = sinkRequest.getClusterMode();
-        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+        if (StringUtils.isNotBlank(targetEntity.getDataNodeName())) {
+            RedisDataNodeInfo dataNodeInfo = (RedisDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    targetEntity.getDataNodeName(), 
targetEntity.getSinkType());
+
+            CommonBeanUtils.copyProperties(dataNodeInfo, sinkRequest, true);
+        }
+
+        RedisClusterMode redisClusterMode = 
RedisClusterMode.of(sinkRequest.getClusterMode());
 
         expectNotNull(redisClusterMode,
                 "Redis ClusterMode must in one of " + 
Arrays.toString(RedisClusterMode.values()) + " !");
 
         switch (redisClusterMode) {
             case CLUSTER:
-                String clusterNodes = sinkRequest.getClusterNodes();
-                checkClusterNodes(clusterNodes);
+                checkClusterNodes(sinkRequest.getClusterNodes());
                 break;
             case SENTINEL:
-                String sentinelMasterName = sinkRequest.getMasterName();
-                expectNotEmpty(sentinelMasterName, "Redis MasterName of 
Sentinel cluster must not null!");
-                String sentinelsInfo = sinkRequest.getSentinelsInfo();
-                expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel 
cluster must not null!");
+                expectNotEmpty(sinkRequest.getMasterName(), "Redis MasterName 
of Sentinel cluster must not null!");
+                expectNotEmpty(sinkRequest.getSentinelsInfo(),
+                        "Redis sentinelsInfo of Sentinel cluster must not 
null!");
                 break;
             case STANDALONE:
                 String host = sinkRequest.getHost();
@@ -148,6 +154,29 @@ public class RedisSinkOperator extends 
AbstractSinkOperator {
         }
 
         RedisSinkDTO dto = RedisSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getHost())) {
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, 
"redis data node is blank");
+            }
+            RedisDataNodeInfo dataNodeInfo = (RedisDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            String clusterMode = dataNodeInfo.getClusterMode();
+            dto.setClusterMode(clusterMode);
+            switch (RedisClusterMode.of(clusterMode)) {
+                case CLUSTER:
+                    dto.setClusterNodes(dataNodeInfo.getClusterNodes());
+                    break;
+                case SENTINEL:
+                    dto.setMasterName(dataNodeInfo.getMasterName());
+                    dto.setSentinelsInfo(dataNodeInfo.getSentinelsInfo());
+                    break;
+                case STANDALONE:
+                    dto.setHost(dataNodeInfo.getHost());
+                    dto.setPort(dataNodeInfo.getPort());
+                    break;
+            }
+        }
 
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);

Reply via email to