This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d7aad5440e [INLONG-11790][Manager] CLS uses topic id as the unique key 
(#11791)
d7aad5440e is described below

commit d7aad5440ee7d3e19cacbec85462089cc73737e0
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Mon Mar 3 11:55:25 2025 +0800

    [INLONG-11790][Manager] CLS uses topic id as the unique key (#11791)
---
 .../inlong/manager/common/consts/SourceType.java   |  1 +
 .../service/resource/sink/cls/ClsOperator.java     | 38 +++++++++++++++-------
 .../resource/sink/cls/ClsResourceOperator.java     | 26 ++++++++++-----
 .../manager/service/sink/cls/ClsSinkOperator.java  | 15 +++++++++
 4 files changed, 60 insertions(+), 20 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index 263950297d..26cd94f778 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -49,6 +49,7 @@ public class SourceType extends StreamType {
 
             put(FILE, TaskTypeEnum.FILE);
             put(COS, TaskTypeEnum.COS);
+            put(SQL, TaskTypeEnum.SQL);
             put(MYSQL_BINLOG, TaskTypeEnum.BINLOG);
             put(POSTGRESQL, TaskTypeEnum.POSTGRES);
             put(ORACLE, TaskTypeEnum.ORACLE);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
index f5162d1cc0..a89adf5d39 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
@@ -58,7 +58,7 @@ public class ClsOperator {
     @Value("${cls.manager.endpoint}")
     private String endpoint;
     private static final Logger LOG = 
LoggerFactory.getLogger(ClsOperator.class);
-    private static final String TOPIC_NAME = "topicName";
+    private static final String TOPIC_ID = "topicId";
     private static final String LOG_SET_ID = "logsetId";
     private static final long PRECISE_SEARCH = 1L;
 
@@ -117,12 +117,12 @@ public class ClsOperator {
     }
 
     /**
-     * Describe cls topicId by topic name
+     * Describe cls topicName by topic id
      */
-    public String describeTopicIDByTopicName(String topicName, String 
logSetId, String secretId, String secretKey,
+    public String describeTopicNameByTopicId(String topicId, String logSetId, 
String secretId, String secretKey,
             String region) {
         ClsClient clsClient = getClsClient(secretId, secretKey, region);
-        Filter[] filters = getDescribeFilters(topicName, logSetId);
+        Filter[] filters = getDescribeFilters(topicId, logSetId);
         DescribeTopicsRequest req = new DescribeTopicsRequest();
         req.setFilters(filters);
         req.setPreciseSearch(PRECISE_SEARCH);
@@ -130,7 +130,7 @@ public class ClsOperator {
             DescribeTopicsResponse describeTopicsResponse = 
clsClient.DescribeTopics(req);
             if (ArrayUtils.isNotEmpty(describeTopicsResponse.getTopics())) {
                 TopicInfo[] topics = describeTopicsResponse.getTopics();
-                return topics[0].getTopicId();
+                return topics[0].getTopicName();
             }
             return null;
         } catch (Exception e) {
@@ -140,17 +140,33 @@ public class ClsOperator {
         }
     }
 
-    public Filter[] getDescribeFilters(String topicName, String logSetId) {
-        Filter topicNameFilter = new Filter();
-        topicNameFilter.setKey(TOPIC_NAME);
-        String[] topicNameFilterValues = new String[]{topicName};
-        topicNameFilter.setValues(topicNameFilterValues);
+    public void modifyTopicNameByTopicId(String topicId, String topicName, 
String secretId, String secretKey,
+            String region) {
+        ClsClient clsClient = getClsClient(secretId, secretKey, region);
+        ModifyTopicRequest req = new ModifyTopicRequest();
+        req.setTopicId(topicId);
+        req.setTopicName(topicName);
+        try {
+            ModifyTopicResponse modifyTopicResponse = 
clsClient.ModifyTopic(req);
+            LOG.info("modify cls topic name success for topicId={}, 
topicName={}", topicId, topicName);
+        } catch (Exception e) {
+            String errMsg = "modify cls topic name failed: " + e.getMessage();
+            LOG.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
+    public Filter[] getDescribeFilters(String topicId, String logSetId) {
+        Filter topicIdFilter = new Filter();
+        topicIdFilter.setKey(TOPIC_ID);
+        String[] topicIdFilterValues = new String[]{topicId};
+        topicIdFilter.setValues(topicIdFilterValues);
 
         Filter logSetIdFilter = new Filter();
         logSetIdFilter.setKey(LOG_SET_ID);
         String[] logSetFilterValues = new String[]{logSetId};
         logSetIdFilter.setValues(logSetFilterValues);
-        return new Filter[]{topicNameFilter, logSetIdFilter};
+        return new Filter[]{topicIdFilter, logSetIdFilter};
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
index ced68be0ad..8148acdcb9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Objects;
+
 @Service
 public class ClsResourceOperator extends 
AbstractStandaloneSinkResourceOperator {
 
@@ -80,8 +82,7 @@ public class ClsResourceOperator extends 
AbstractStandaloneSinkResourceOperator
         ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo);
         ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), 
ClsSinkDTO.class);
         try {
-            String topicId = getTopicID(clsDataNode, clsSinkDTO);
-            clsSinkDTO.setTopicId(topicId);
+            createOrUpdateTopicName(clsDataNode, clsSinkDTO);
             sinkInfo.setExtParams(JsonUtils.toJsonString(clsSinkDTO));
             // create topic index by tokenizer
             clsOperator.createTopicIndex(clsSinkDTO.getTokenizer(), 
clsSinkDTO.getTopicId(),
@@ -101,19 +102,26 @@ public class ClsResourceOperator extends 
AbstractStandaloneSinkResourceOperator
         }
     }
 
-    private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO 
clsSinkDTO)
+    private void createOrUpdateTopicName(ClsDataNodeDTO clsDataNode, 
ClsSinkDTO clsSinkDTO)
             throws Exception {
-        String topicId = 
clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(), 
clsDataNode.getLogSetId(),
-                clsDataNode.getManageSecretId(), 
clsDataNode.getManageSecretKey(),
-                clsDataNode.getRegion());
-        if (StringUtils.isBlank(topicId)) {
+        String topicName = clsSinkDTO.getTopicName();
+        if (StringUtils.isBlank(clsSinkDTO.getTopicId())) {
             // if topic don't exist, create topic in cls
-            topicId = 
clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(), 
clsDataNode.getLogSetId(),
+            String topicId = 
clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(), 
clsDataNode.getLogSetId(),
                     clsSinkDTO.getTag(), clsSinkDTO.getStorageDuration(), 
clsDataNode.getManageSecretId(),
                     clsDataNode.getManageSecretKey(),
                     clsDataNode.getRegion());
+            clsSinkDTO.setTopicId(topicId);
+        } else {
+            topicName = 
clsOperator.describeTopicNameByTopicId(clsSinkDTO.getTopicId(), 
clsDataNode.getLogSetId(),
+                    clsDataNode.getManageSecretId(), 
clsDataNode.getManageSecretKey(),
+                    clsDataNode.getRegion());
+            if (!Objects.equals(topicName, clsSinkDTO.getTopicName())) {
+                clsOperator.modifyTopicNameByTopicId(clsSinkDTO.getTopicId(), 
clsSinkDTO.getTopicName(),
+                        clsDataNode.getManageSecretId(), 
clsDataNode.getManageSecretKey(),
+                        clsDataNode.getRegion());
+            }
         }
-        return topicId;
     }
 
     private void updateSinkInfo(SinkInfo sinkInfo, ClsSinkDTO clsSinkDTO) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index ab6c54fa51..89b580229c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -45,9 +45,11 @@ import 
org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.resource.sink.cls.ClsOperator;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -74,6 +76,8 @@ public class ClsSinkOperator extends AbstractSinkOperator {
     private ObjectMapper objectMapper;
     @Autowired
     private DataNodeEntityMapper dataNodeEntityMapper;
+    @Autowired
+    private ClsOperator clsOperator;
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
@@ -125,6 +129,17 @@ public class ClsSinkOperator extends AbstractSinkOperator {
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         CommonBeanUtils.copyProperties(clsDataNodeDTO, sink, true);
+        if (StringUtils.isNotBlank(sink.getTopicId())) {
+            try {
+                String topicName =
+                        
clsOperator.describeTopicNameByTopicId(sink.getTopicId(), 
clsDataNodeDTO.getLogSetId(),
+                                clsDataNodeDTO.getManageSecretId(), 
clsDataNodeDTO.getManageSecretKey(),
+                                clsDataNodeDTO.getRegion());
+                sink.setTopicName(topicName);
+            } catch (Exception e) {
+                LOGGER.error("get cls topic name failed for sinId={}, 
topicId={}", sink.getId(), sink.getTopicId(), e);
+            }
+        }
         List<SinkField> sinkFields = getSinkFields(entity.getId());
         sink.setSinkFieldList(sinkFields);
         return sink;

Reply via email to