This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2e70a12a2f [Manager] Fix sort standalone get kafka config error (#10106) 2e70a12a2f is described below commit 2e70a12a2f95e9c8eacdc6ce91afa6f4297e2df3 Author: castor <58140421+castor...@users.noreply.github.com> AuthorDate: Sat May 4 14:25:39 2024 +0800 [Manager] Fix sort standalone get kafka config error (#10106) Co-authored-by: castorqin <qhj00...@qq.com> --- .../service/node/kafka/KafkaDataNodeOperator.java | 14 ++++++++++++++ .../service/sink/kafka/KafkaSinkOperator.java | 22 ++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java index ae91b2f394..4fbc740d36 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java @@ -40,6 +40,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; +import java.util.Map; + /** * Kafka data node operator */ @@ -48,6 +50,9 @@ public class KafkaDataNodeOperator extends AbstractDataNodeOperator { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDataNodeOperator.class); + private static final String bootstrapServers = "bootstrap.servers"; + private static final String clientId = "client.id"; + @Autowired private ObjectMapper objectMapper; @@ -79,6 +84,15 @@ public class KafkaDataNodeOperator extends AbstractDataNodeOperator { return kafkaDataNodeInfo; } + @Override + public Map<String, String> parse2SinkParams(DataNodeInfo info) { + Map<String, String> params = super.parse2SinkParams(info); + KafkaDataNodeInfo kafkaDataNodeInfo = (KafkaDataNodeInfo) info; + params.put(bootstrapServers, kafkaDataNodeInfo.getBootstrapServers()); + params.put(clientId, kafkaDataNodeInfo.getClientId()); + return params; + } + @Override protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { KafkaDataNodeRequest nodeRequest = (KafkaDataNodeRequest) request; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java index 4357556732..d7fa197c9f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -30,6 +31,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO; import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Map; /** * Kafka sink operator @@ -46,6 +49,8 @@ public class KafkaSinkOperator extends AbstractSinkOperator { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkOperator.class); + private static final String topic = "topic"; + @Autowired private ObjectMapper objectMapper; @@ -75,6 +80,23 @@ public class KafkaSinkOperator extends AbstractSinkOperator { } } + @Override + public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields, + DataNodeInfo dataNodeInfo) { + + Map<String, String> params = super.parse2IdParams(streamSink, fields, dataNodeInfo); + + KafkaSinkDTO kafkaSinkDTO; + try { + kafkaSinkDTO = objectMapper.readValue(streamSink.getExtParams(), KafkaSinkDTO.class); + } catch (JsonProcessingException e) { + LOGGER.error("parse kafka sink dto error", e); + return params; + } + params.put(topic, kafkaSinkDTO.getTopicName()); + return params; + } + @Override public StreamSink getFromEntity(StreamSinkEntity entity) { KafkaSink sink = new KafkaSink();