This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 78c39a3d99 [INLONG-9211][Manager] Fix failure to obtain data from Redis sink (#9212) 78c39a3d99 is described below commit 78c39a3d9940c88fd1656721916fc30ac2cfd263 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Mon Nov 6 22:05:38 2023 +0800 [INLONG-9211][Manager] Fix failure to obtain data from Redis sink (#9212) --- .../postgresql/PostgreSQLResourceOperator.java | 2 +- .../service/sink/redis/RedisSinkOperator.java | 45 ++++++++++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java index 7d6cbf42df..379dad813b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLResourceOperator.java @@ -100,7 +100,7 @@ public class PostgreSQLResourceOperator implements SinkResourceOperator { try (Connection conn = PostgreSQLJdbcUtils.getConnection(postgreSQLSink.getJdbcUrl(), postgreSQLSink.getUsername(), postgreSQLSink.getPassword())) { // 1.If schema not exists,create it - PostgreSQLJdbcUtils.createSchema(conn, tableInfo.getTableName(), tableInfo.getUserName()); + PostgreSQLJdbcUtils.createSchema(conn, tableInfo.getSchemaName(), tableInfo.getUserName()); // 2.If table not exists, create it PostgreSQLJdbcUtils.createTable(conn, tableInfo); // 3.Table exists, add columns - skip the exists columns diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java index e22570942b..0a31f0a93c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -34,6 +35,7 @@ import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -83,22 +85,26 @@ public class RedisSinkOperator extends AbstractSinkOperator { RedisSinkRequest sinkRequest = (RedisSinkRequest) request; - String clusterMode = sinkRequest.getClusterMode(); - RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode); + if (StringUtils.isNotBlank(targetEntity.getDataNodeName())) { + RedisDataNodeInfo dataNodeInfo = (RedisDataNodeInfo) dataNodeHelper.getDataNodeInfo( + targetEntity.getDataNodeName(), targetEntity.getSinkType()); + + CommonBeanUtils.copyProperties(dataNodeInfo, sinkRequest, true); + } + + RedisClusterMode redisClusterMode = RedisClusterMode.of(sinkRequest.getClusterMode()); expectNotNull(redisClusterMode, "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !"); switch (redisClusterMode) { case CLUSTER: - String clusterNodes = sinkRequest.getClusterNodes(); - checkClusterNodes(clusterNodes); + checkClusterNodes(sinkRequest.getClusterNodes()); break; case SENTINEL: - String sentinelMasterName = sinkRequest.getMasterName(); - expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!"); - String sentinelsInfo = sinkRequest.getSentinelsInfo(); - expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!"); + expectNotEmpty(sinkRequest.getMasterName(), "Redis MasterName of Sentinel cluster must not null!"); + expectNotEmpty(sinkRequest.getSentinelsInfo(), + "Redis sentinelsInfo of Sentinel cluster must not null!"); break; case STANDALONE: String host = sinkRequest.getHost(); @@ -148,6 +154,29 @@ public class RedisSinkOperator extends AbstractSinkOperator { } RedisSinkDTO dto = RedisSinkDTO.getFromJson(entity.getExtParams()); + if (StringUtils.isBlank(dto.getHost())) { + if (StringUtils.isBlank(entity.getDataNodeName())) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, "redis data node is blank"); + } + RedisDataNodeInfo dataNodeInfo = (RedisDataNodeInfo) dataNodeHelper.getDataNodeInfo( + entity.getDataNodeName(), entity.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); + String clusterMode = dataNodeInfo.getClusterMode(); + dto.setClusterMode(clusterMode); + switch (RedisClusterMode.of(clusterMode)) { + case CLUSTER: + dto.setClusterNodes(dataNodeInfo.getClusterNodes()); + break; + case SENTINEL: + dto.setMasterName(dataNodeInfo.getMasterName()); + dto.setSentinelsInfo(dataNodeInfo.getSentinelsInfo()); + break; + case STANDALONE: + dto.setHost(dataNodeInfo.getHost()); + dto.setPort(dataNodeInfo.getPort()); + break; + } + } CommonBeanUtils.copyProperties(entity, sink, true); CommonBeanUtils.copyProperties(dto, sink, true);