This is an automated email from the ASF dual-hosted git repository. wakefu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d7aad5440e [INLONG-11790][Manager] CLS uses topic id as the unique key (#11791) d7aad5440e is described below commit d7aad5440ee7d3e19cacbec85462089cc73737e0 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Mon Mar 3 11:55:25 2025 +0800 [INLONG-11790][Manager] CLS uses topic id as the unique key (#11791) --- .../inlong/manager/common/consts/SourceType.java | 1 + .../service/resource/sink/cls/ClsOperator.java | 38 +++++++++++++++------- .../resource/sink/cls/ClsResourceOperator.java | 26 ++++++++++----- .../manager/service/sink/cls/ClsSinkOperator.java | 15 +++++++++ 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java index 263950297d..26cd94f778 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java @@ -49,6 +49,7 @@ public class SourceType extends StreamType { put(FILE, TaskTypeEnum.FILE); put(COS, TaskTypeEnum.COS); + put(SQL, TaskTypeEnum.SQL); put(MYSQL_BINLOG, TaskTypeEnum.BINLOG); put(POSTGRESQL, TaskTypeEnum.POSTGRES); put(ORACLE, TaskTypeEnum.ORACLE); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java index f5162d1cc0..a89adf5d39 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java @@ -58,7 +58,7 @@ public class ClsOperator { @Value("${cls.manager.endpoint}") private String endpoint; private static final Logger LOG = LoggerFactory.getLogger(ClsOperator.class); - private static final String TOPIC_NAME = "topicName"; + private static final String TOPIC_ID = "topicId"; private static final String LOG_SET_ID = "logsetId"; private static final long PRECISE_SEARCH = 1L; @@ -117,12 +117,12 @@ public class ClsOperator { } /** - * Describe cls topicId by topic name + * Describe cls topicName by topic id */ - public String describeTopicIDByTopicName(String topicName, String logSetId, String secretId, String secretKey, + public String describeTopicNameByTopicId(String topicId, String logSetId, String secretId, String secretKey, String region) { ClsClient clsClient = getClsClient(secretId, secretKey, region); - Filter[] filters = getDescribeFilters(topicName, logSetId); + Filter[] filters = getDescribeFilters(topicId, logSetId); DescribeTopicsRequest req = new DescribeTopicsRequest(); req.setFilters(filters); req.setPreciseSearch(PRECISE_SEARCH); @@ -130,7 +130,7 @@ public class ClsOperator { DescribeTopicsResponse describeTopicsResponse = clsClient.DescribeTopics(req); if (ArrayUtils.isNotEmpty(describeTopicsResponse.getTopics())) { TopicInfo[] topics = describeTopicsResponse.getTopics(); - return topics[0].getTopicId(); + return topics[0].getTopicName(); } return null; } catch (Exception e) { @@ -140,17 +140,33 @@ public class ClsOperator { } } - public Filter[] getDescribeFilters(String topicName, String logSetId) { - Filter topicNameFilter = new Filter(); - topicNameFilter.setKey(TOPIC_NAME); - String[] topicNameFilterValues = new String[]{topicName}; - topicNameFilter.setValues(topicNameFilterValues); + public void modifyTopicNameByTopicId(String topicId, String topicName, String secretId, String secretKey, + String region) { + ClsClient clsClient = getClsClient(secretId, secretKey, region); + ModifyTopicRequest req = new ModifyTopicRequest(); + req.setTopicId(topicId); + req.setTopicName(topicName); + try { + ModifyTopicResponse modifyTopicResponse = clsClient.ModifyTopic(req); + LOG.info("modify cls topic name success for topicId={}, topicName={}", topicId, topicName); + } catch (Exception e) { + String errMsg = "modify cls topic name failed: " + e.getMessage(); + LOG.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + + public Filter[] getDescribeFilters(String topicId, String logSetId) { + Filter topicIdFilter = new Filter(); + topicIdFilter.setKey(TOPIC_ID); + String[] topicIdFilterValues = new String[]{topicId}; + topicIdFilter.setValues(topicIdFilterValues); Filter logSetIdFilter = new Filter(); logSetIdFilter.setKey(LOG_SET_ID); String[] logSetFilterValues = new String[]{logSetId}; logSetIdFilter.setValues(logSetFilterValues); - return new Filter[]{topicNameFilter, logSetIdFilter}; + return new Filter[]{topicIdFilter, logSetIdFilter}; } /** diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java index ced68be0ad..8148acdcb9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java @@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Objects; + @Service public class ClsResourceOperator extends AbstractStandaloneSinkResourceOperator { @@ -80,8 +82,7 @@ public class ClsResourceOperator extends AbstractStandaloneSinkResourceOperator ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo); ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); try { - String topicId = getTopicID(clsDataNode, clsSinkDTO); - clsSinkDTO.setTopicId(topicId); + createOrUpdateTopicName(clsDataNode, clsSinkDTO); sinkInfo.setExtParams(JsonUtils.toJsonString(clsSinkDTO)); // create topic index by tokenizer clsOperator.createTopicIndex(clsSinkDTO.getTokenizer(), clsSinkDTO.getTopicId(), @@ -101,19 +102,26 @@ public class ClsResourceOperator extends AbstractStandaloneSinkResourceOperator } } - private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) + private void createOrUpdateTopicName(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) throws Exception { - String topicId = clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(), - clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), - clsDataNode.getRegion()); - if (StringUtils.isBlank(topicId)) { + String topicName = clsSinkDTO.getTopicName(); + if (StringUtils.isBlank(clsSinkDTO.getTopicId())) { // if topic don't exist, create topic in cls - topicId = clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(), + String topicId = clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(), clsSinkDTO.getTag(), clsSinkDTO.getStorageDuration(), clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), clsDataNode.getRegion()); + clsSinkDTO.setTopicId(topicId); + } else { + topicName = clsOperator.describeTopicNameByTopicId(clsSinkDTO.getTopicId(), clsDataNode.getLogSetId(), + clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), + clsDataNode.getRegion()); + if (!Objects.equals(topicName, clsSinkDTO.getTopicName())) { + clsOperator.modifyTopicNameByTopicId(clsSinkDTO.getTopicId(), clsSinkDTO.getTopicName(), + clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(), + clsDataNode.getRegion()); + } } - return topicId; } private void updateSinkInfo(SinkInfo sinkInfo, ClsSinkDTO clsSinkDTO) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java index ab6c54fa51..89b580229c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java @@ -45,9 +45,11 @@ import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest; import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils; import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.service.resource.sink.cls.ClsOperator; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -74,6 +76,8 @@ public class ClsSinkOperator extends AbstractSinkOperator { private ObjectMapper objectMapper; @Autowired private DataNodeEntityMapper dataNodeEntityMapper; + @Autowired + private ClsOperator clsOperator; @Override protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) { @@ -125,6 +129,17 @@ public class ClsSinkOperator extends AbstractSinkOperator { CommonBeanUtils.copyProperties(entity, sink, true); CommonBeanUtils.copyProperties(dto, sink, true); CommonBeanUtils.copyProperties(clsDataNodeDTO, sink, true); + if (StringUtils.isNotBlank(sink.getTopicId())) { + try { + String topicName = + clsOperator.describeTopicNameByTopicId(sink.getTopicId(), clsDataNodeDTO.getLogSetId(), + clsDataNodeDTO.getManageSecretId(), clsDataNodeDTO.getManageSecretKey(), + clsDataNodeDTO.getRegion()); + sink.setTopicName(topicName); + } catch (Exception e) { + LOGGER.error("get cls topic name failed for sinId={}, topicId={}", sink.getId(), sink.getTopicId(), e); + } + } List<SinkField> sinkFields = getSinkFields(entity.getId()); sink.setSinkFieldList(sinkFields); return sink;