This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e70a12a2f [Manager] Fix sort standalone get kafka config error 
(#10106)
2e70a12a2f is described below

commit 2e70a12a2f95e9c8eacdc6ce91afa6f4297e2df3
Author: castor <58140421+castor...@users.noreply.github.com>
AuthorDate: Sat May 4 14:25:39 2024 +0800

    [Manager] Fix sort standalone get kafka config error (#10106)
    
    Co-authored-by: castorqin <qhj00...@qq.com>
---
 .../service/node/kafka/KafkaDataNodeOperator.java  | 14 ++++++++++++++
 .../service/sink/kafka/KafkaSinkOperator.java      | 22 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
index ae91b2f394..4fbc740d36 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
@@ -40,6 +40,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
+import java.util.Map;
+
 /**
  * Kafka data node operator
  */
@@ -48,6 +50,9 @@ public class KafkaDataNodeOperator extends 
AbstractDataNodeOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaDataNodeOperator.class);
 
+    private static final String bootstrapServers = "bootstrap.servers";
+    private static final String clientId = "client.id";
+
     @Autowired
     private ObjectMapper objectMapper;
 
@@ -79,6 +84,15 @@ public class KafkaDataNodeOperator extends 
AbstractDataNodeOperator {
         return kafkaDataNodeInfo;
     }
 
+    @Override
+    public Map<String, String> parse2SinkParams(DataNodeInfo info) {
+        Map<String, String> params = super.parse2SinkParams(info);
+        KafkaDataNodeInfo kafkaDataNodeInfo = (KafkaDataNodeInfo) info;
+        params.put(bootstrapServers, kafkaDataNodeInfo.getBootstrapServers());
+        params.put(clientId, kafkaDataNodeInfo.getClientId());
+        return params;
+    }
+
     @Override
     protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
         KafkaDataNodeRequest nodeRequest = (KafkaDataNodeRequest) request;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index 4357556732..d7fa197c9f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -30,6 +31,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Kafka sink operator
@@ -46,6 +49,8 @@ public class KafkaSinkOperator extends AbstractSinkOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSinkOperator.class);
 
+    private static final String topic = "topic";
+
     @Autowired
     private ObjectMapper objectMapper;
 
@@ -75,6 +80,23 @@ public class KafkaSinkOperator extends AbstractSinkOperator {
         }
     }
 
+    @Override
+    public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, 
List<String> fields,
+            DataNodeInfo dataNodeInfo) {
+
+        Map<String, String> params = super.parse2IdParams(streamSink, fields, 
dataNodeInfo);
+
+        KafkaSinkDTO kafkaSinkDTO;
+        try {
+            kafkaSinkDTO = objectMapper.readValue(streamSink.getExtParams(), 
KafkaSinkDTO.class);
+        } catch (JsonProcessingException e) {
+            LOGGER.error("parse kafka sink dto error", e);
+            return params;
+        }
+        params.put(topic, kafkaSinkDTO.getTopicName());
+        return params;
+    }
+
     @Override
     public StreamSink getFromEntity(StreamSinkEntity entity) {
         KafkaSink sink = new KafkaSink();

Reply via email to