fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018602157


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java:
##########
@@ -67,49 +62,12 @@ public boolean accept(String mqType) {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, 
@NotBlank String operator) {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create kafka resource for groupId={}", groupId);
-
-        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
-        try {
-            // 1. create kafka Topic - each Inlong Stream corresponds to a 
Kafka Topic
-            List<InlongStreamBriefInfo> streamInfoList = 
streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no 
streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, 
streamInfo.getMqResource());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create kafka resource for 
groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
-        }
-        log.info("success to create kafka resource for groupId={}, 
cluster={}", groupId, inlongKafkaInfo);
+        log.info("skip to create kafka topic for groupId={}", 
groupInfo.getInlongGroupId());
     }
 
     @Override
     public void deleteQueueForGroup(InlongGroupInfo groupInfo, String 
operator) {
-        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be 
null");
-
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to delete kafka resource for groupId={}", groupId);
-        ClusterInfo clusterInfo = 
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        try {
-            List<InlongStreamBriefInfo> streamInfoList = 
streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no 
streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deleteKafkaTopic(groupInfo, 
streamInfo.getInlongStreamId());
-            }
-        } catch (Exception e) {
-            log.error("failed to delete kafka resource for groupId=" + 
groupId, e);
-            throw new WorkflowListenerException("failed to delete kafka 
resource: " + e.getMessage());
-        }
-        log.info("success to delete kafka resource for groupId={}, 
cluster={}", groupId, clusterInfo);
+        log.info("skip to delete kafka topic for groupId={}", 
groupInfo.getInlongGroupId());

Review Comment:
   I fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to