vernedeng commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018587699


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java:
##########
@@ -67,49 +62,12 @@ public boolean accept(String mqType) {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, 
@NotBlank String operator) {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create kafka resource for groupId={}", groupId);
-
-        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
-        try {
-            // 1. create kafka Topic - each Inlong Stream corresponds to a 
Kafka Topic
-            List<InlongStreamBriefInfo> streamInfoList = 
streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no 
streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, 
streamInfo.getMqResource());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create kafka resource for 
groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
-        }
-        log.info("success to create kafka resource for groupId={}, 
cluster={}", groupId, inlongKafkaInfo);
+        log.info("skip to create kafka topic for groupId={}", 
groupInfo.getInlongGroupId());
     }
 
     @Override
     public void deleteQueueForGroup(InlongGroupInfo groupInfo, String 
operator) {
-        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be 
null");
-
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to delete kafka resource for groupId={}", groupId);
-        ClusterInfo clusterInfo = 
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        try {
-            List<InlongStreamBriefInfo> streamInfoList = 
streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no 
streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deleteKafkaTopic(groupInfo, 
streamInfo.getInlongStreamId());
-            }
-        } catch (Exception e) {
-            log.error("failed to delete kafka resource for groupId=" + 
groupId, e);
-            throw new WorkflowListenerException("failed to delete kafka 
resource: " + e.getMessage());
-        }
-        log.info("success to delete kafka resource for groupId={}, 
cluster={}", groupId, clusterInfo);
+        log.info("skip to delete kafka topic for groupId={}", 
groupInfo.getInlongGroupId());

Review Comment:
   why skip delete kafka topic directly?



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java:
##########
@@ -81,6 +82,10 @@ public ListenerResult listen(WorkflowContext context) throws 
WorkflowListenerExc
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("not build sort config for groupId={}, as the stream 
is empty", groupId);

Review Comment:
   
   ```suggestion
               LOGGER.warn("do not build sort config for groupId={}, as the 
stream is empty", groupId);
   ```



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,45 @@ public ListenerResult listen(WorkflowContext context) 
throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, 
List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = 
StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for 
groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> 
workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 
1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);
+                /*WorkflowResult result = future.get(180, TimeUnit.SECONDS);
+                List<TaskResponse> tasks = result.getNewTasks();
+                if (TaskStatus.FAILED == tasks.get(tasks.size() - 
1).getStatus()) {
+                    log.error(errMsg);
+                    throw new WorkflowListenerException(errMsg);
+                }*/

Review Comment:
   plsase remove these commented code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to