fuweng11 commented on code in PR #9029: URL: https://github.com/apache/inlong/pull/9029#discussion_r1349839557
########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -55,6 +64,9 @@ public class ClsResourceOperator extends AbstractStandaloneSinkResourceOperator { private static final Logger LOG = LoggerFactory.getLogger(ClsResourceOperator.class); + private static final String TOPIC_NAME = "topicName"; + private static final String LOG_SET_ID = "logsetId"; Review Comment: private static final String LOG_SET_ID = "logSetId"; ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -113,6 +120,31 @@ private void createTopicID(SinkInfo sinkInfo) { } } + private String getTopicID(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) + throws TencentCloudSDKException { + String topicID = describeTopicIDByTopicName(sinkInfo, clsDataNode); + if (Strings.isEmpty(topicID)) { + // if topic don't exist,create topic in cls + topicID = createTopicReturnTopicId(clsDataNode, clsSinkDTO); + } + return topicID; + } + + private String createTopicReturnTopicId(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) + throws TencentCloudSDKException { + ClsClient client = getClsClient(clsDataNode); + CreateTopicRequest req = getCreateTopicRequest(clsDataNode, clsSinkDTO); + CreateTopicResponse resp = client.CreateTopic(req); + LOG.info("create cls topic {} success ,topicId {}", clsSinkDTO.getTopicName(), resp.getTopicId()); Review Comment: reate cls topic success for topicName = {}, topicId = {} ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -78,32 +90,27 @@ public void createSinkResource(SinkInfo sinkInfo) { LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]"); return; } - this.createTopicID(sinkInfo); + this.createClsResource(sinkInfo); this.assignCluster(sinkInfo); } /** * Create cloud log service topic */ - private void createTopicID(SinkInfo sinkInfo) { + private void createClsResource(SinkInfo sinkInfo) { ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo); ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); try { - ClsClient client = getClsClient(clsDataNode); - CreateTopicRequest req = getCreateTopicRequest(clsDataNode, clsSinkDTO); - CreateTopicResponse resp = client.CreateTopic(req); - LOG.info("create cls topic {} success ,topicId {}", clsSinkDTO.getTopicName(), resp.getTopicId()); - // update set topic id into sink info - clsSinkDTO.setTopicId(resp.getTopicId()); + String topicID = getTopicID(sinkInfo, clsDataNode, clsSinkDTO); + clsSinkDTO.setTopicId(topicID); sinkInfo.setExtParams(JsonUtils.toJsonString(clsSinkDTO)); // create topic index by tokenizer - this.createTopicIndex(sinkInfo); - StreamSinkEntity streamSinkEntity = new StreamSinkEntity(); - CommonBeanUtils.copyProperties(sinkInfo, streamSinkEntity, true); - streamSinkEntityMapper.updateByIdSelective(streamSinkEntity); + this.createTopicIndex(sinkInfo, clsDataNode); + // update set topic id into sink info + updateSinkInfo(sinkInfo, clsSinkDTO); String info = "success to create cls resource"; sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); - LOG.info("update cls sink = {}info status success ,topicName {}", streamSinkEntity.getSinkName(), + LOG.info("update cls sink = {}info status success ,topicName {}", sinkInfo.getSinkName(), Review Comment: update cls info status success for sinkId= {},topicName ={} ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -78,32 +90,27 @@ public void createSinkResource(SinkInfo sinkInfo) { LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]"); return; } - this.createTopicID(sinkInfo); + this.createClsResource(sinkInfo); this.assignCluster(sinkInfo); } /** * Create cloud log service topic Review Comment: createClsResource? ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -136,14 +168,33 @@ private ClsClient getClsClient(ClsDataNodeDTO clsDataNode) { /** * Create topic index by tokenizer */ - private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException { + private void createTopicIndex(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) throws BusinessException { + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); - if (StringUtils.isNotBlank(clsSinkDTO.getTokenizer())) { + if (StringUtils.isEmpty(clsSinkDTO.getTokenizer())) { LOG.warn("topic {} tokenizer is empty", clsSinkDTO.getTopicName()); return; } - ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo); + FullTextInfo topicIndexFullText = getTopicIndexFullText(sinkInfo, clsDataNode); + if (ObjectUtils.anyNotNull(topicIndexFullText)) { + // if topic index exist,update + updateTopicIndex(sinkInfo, clsDataNode); + return; + } ClsClient clsClient = getClsClient(clsDataNode); + CreateIndexRequest req = getCreateIndexRequest(clsSinkDTO); + try { + clsClient.CreateIndex(req); + } catch (TencentCloudSDKException e) { + String errMsg = "Create cls topic index failed: " + e.getMessage(); + LOG.error(errMsg, e); + sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); + throw new BusinessException(errMsg); + } + LOG.info("topic {} create index success tokenizer is {}", clsSinkDTO.getTopicName(), clsSinkDTO.getTokenizer()); Review Comment: create index success for topic ={}, tokenizer = {} ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java: ########## @@ -218,14 +219,19 @@ public void deleteOpt(StreamSinkEntity entity, String operator) { } @Override - public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields) { + public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields, + DataNodeInfo dataNodeInfo) { Map<String, String> param; try { - param = JsonUtils.parseObject(streamSink.getExtParams(), HashMap.class); - // put group and stream info - assert param != null; + HashMap<String, Object> streamInfoMap = JsonUtils.parseObject(streamSink.getExtParams(), HashMap.class); + param = new HashMap<>(); + assert streamInfoMap != null; + for (String key : streamInfoMap.keySet()) { + param.put(key, String.valueOf(streamInfoMap.get(key))); + } param.put(KEY_GROUP_ID, streamSink.getInlongGroupId()); param.put(KEY_STREAM_ID, streamSink.getInlongStreamId()); + // put group and stream info Review Comment: ```suggestion // put group and stream info param.put(KEY_GROUP_ID, streamSink.getInlongGroupId()); param.put(KEY_STREAM_ID, streamSink.getInlongStreamId()); ``` ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -152,15 +203,86 @@ private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException { CreateIndexRequest req = new CreateIndexRequest(); req.setTopicId(clsSinkDTO.getTopicId()); req.setRule(ruleInfo); + return req; + } + + private void updateTopicIndex(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) { + ClsClient clsClient = getClsClient(clsDataNode); + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); + RuleInfo ruleInfo = new RuleInfo(); + FullTextInfo fullTextInfo1 = new FullTextInfo(); + fullTextInfo1.setTokenizer(clsSinkDTO.getTokenizer()); + ruleInfo.setFullText(fullTextInfo1); + + ModifyIndexRequest req = new ModifyIndexRequest(); + req.setTopicId(clsSinkDTO.getTopicId()); + req.setRule(ruleInfo); try { - clsClient.CreateIndex(req); + clsClient.ModifyIndex(req); } catch (TencentCloudSDKException e) { - String errMsg = "Create cls topic index failed: " + e.getMessage(); + String errMsg = "update cls topic index failed: " + e.getMessage(); LOG.error(errMsg, e); sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); throw new BusinessException(errMsg); } - LOG.info("topic {} create index success tokenizer is {}", clsSinkDTO.getTopicName(), clsSinkDTO.getTokenizer()); + } + + /** + * Describe cls topicId by topic name + */ + private String describeTopicIDByTopicName(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) { + ClsClient clsClient = getClsClient(clsDataNode); + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); + Filter[] filters = getDescribeFilters(clsDataNode, clsSinkDTO); + DescribeTopicsRequest req = new DescribeTopicsRequest(); + req.setFilters(filters); + req.setPreciseSearch(PRECISE_SEARCH); + try { + DescribeTopicsResponse describeTopicsResponse = clsClient.DescribeTopics(req); + LOG.info("sink {} describe cls topic success topic count {}", sinkInfo.getSinkName(), + describeTopicsResponse.getTotalCount()); + if (ArrayUtils.isNotEmpty(describeTopicsResponse.getTopics())) { + TopicInfo[] topics = describeTopicsResponse.getTopics(); + return topics[0].getTopicId(); + } + return null; + } catch (TencentCloudSDKException e) { + String errMsg = "describe cls topic failed: " + e.getMessage(); + LOG.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + + /** + * Get cls topic index full text + */ + private FullTextInfo getTopicIndexFullText(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) { + ClsClient clsClient = getClsClient(clsDataNode); + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); + + DescribeIndexRequest req = new DescribeIndexRequest(); + req.setTopicId(clsSinkDTO.getTopicId()); + try { + DescribeIndexResponse resp = clsClient.DescribeIndex(req); + return resp.getRule() == null ? null : resp.getRule().getFullText(); + } catch (TencentCloudSDKException e) { + String errMsg = "describe cls topic index failed: " + e.getMessage(); + LOG.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + + private Filter[] getDescribeFilters(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) { + Filter filter = new Filter(); + String[] filterValues = new String[1]; + filterValues[0] = clsSinkDTO.getTopicName(); + filter.setKey(TOPIC_NAME); + filter.setValues(filterValues); + Filter filter1 = new Filter(); Review Comment: Filter filter1? ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -136,14 +168,33 @@ private ClsClient getClsClient(ClsDataNodeDTO clsDataNode) { /** * Create topic index by tokenizer */ - private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException { + private void createTopicIndex(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) throws BusinessException { + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); - if (StringUtils.isNotBlank(clsSinkDTO.getTokenizer())) { + if (StringUtils.isEmpty(clsSinkDTO.getTokenizer())) { Review Comment: StringUtils.isBlank ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java: ########## @@ -152,15 +203,86 @@ private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException { CreateIndexRequest req = new CreateIndexRequest(); req.setTopicId(clsSinkDTO.getTopicId()); req.setRule(ruleInfo); + return req; + } + + private void updateTopicIndex(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) { + ClsClient clsClient = getClsClient(clsDataNode); + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); + RuleInfo ruleInfo = new RuleInfo(); + FullTextInfo fullTextInfo1 = new FullTextInfo(); + fullTextInfo1.setTokenizer(clsSinkDTO.getTokenizer()); + ruleInfo.setFullText(fullTextInfo1); + + ModifyIndexRequest req = new ModifyIndexRequest(); + req.setTopicId(clsSinkDTO.getTopicId()); + req.setRule(ruleInfo); try { - clsClient.CreateIndex(req); + clsClient.ModifyIndex(req); } catch (TencentCloudSDKException e) { - String errMsg = "Create cls topic index failed: " + e.getMessage(); + String errMsg = "update cls topic index failed: " + e.getMessage(); LOG.error(errMsg, e); sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); throw new BusinessException(errMsg); } - LOG.info("topic {} create index success tokenizer is {}", clsSinkDTO.getTopicName(), clsSinkDTO.getTokenizer()); + } + + /** + * Describe cls topicId by topic name + */ + private String describeTopicIDByTopicName(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) { + ClsClient clsClient = getClsClient(clsDataNode); + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); + Filter[] filters = getDescribeFilters(clsDataNode, clsSinkDTO); + DescribeTopicsRequest req = new DescribeTopicsRequest(); + req.setFilters(filters); + req.setPreciseSearch(PRECISE_SEARCH); + try { + DescribeTopicsResponse describeTopicsResponse = clsClient.DescribeTopics(req); + LOG.info("sink {} describe cls topic success topic count {}", sinkInfo.getSinkName(), + describeTopicsResponse.getTotalCount()); + if (ArrayUtils.isNotEmpty(describeTopicsResponse.getTopics())) { + TopicInfo[] topics = describeTopicsResponse.getTopics(); + return topics[0].getTopicId(); + } + return null; + } catch (TencentCloudSDKException e) { + String errMsg = "describe cls topic failed: " + e.getMessage(); + LOG.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + + /** + * Get cls topic index full text + */ + private FullTextInfo getTopicIndexFullText(SinkInfo sinkInfo, ClsDataNodeDTO clsDataNode) { + ClsClient clsClient = getClsClient(clsDataNode); + ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), ClsSinkDTO.class); + + DescribeIndexRequest req = new DescribeIndexRequest(); + req.setTopicId(clsSinkDTO.getTopicId()); + try { + DescribeIndexResponse resp = clsClient.DescribeIndex(req); + return resp.getRule() == null ? null : resp.getRule().getFullText(); + } catch (TencentCloudSDKException e) { + String errMsg = "describe cls topic index failed: " + e.getMessage(); + LOG.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + + private Filter[] getDescribeFilters(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO) { + Filter filter = new Filter(); + String[] filterValues = new String[1]; + filterValues[0] = clsSinkDTO.getTopicName(); + filter.setKey(TOPIC_NAME); + filter.setValues(filterValues); + Filter filter1 = new Filter(); + String[] filterValues1 = new String[]{clsDataNode.getLogSetId()}; Review Comment: String[] filterValues1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org