vernedeng commented on code in PR #6046: URL: https://github.com/apache/inlong/pull/6046#discussion_r1018587699
########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java: ########## @@ -67,49 +62,12 @@ public boolean accept(String mqType) { @Override public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) { - String groupId = groupInfo.getInlongGroupId(); - log.info("begin to create kafka resource for groupId={}", groupId); - - InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo; - try { - // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic - List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); - if (streamInfoList == null || streamInfoList.isEmpty()) { - log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId); - return; - } - for (InlongStreamBriefInfo streamInfo : streamInfoList) { - this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource()); - } - } catch (Exception e) { - String msg = String.format("failed to create kafka resource for groupId=%s", groupId); - log.error(msg, e); - throw new WorkflowListenerException(msg + ": " + e.getMessage()); - } - log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo); + log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId()); } @Override public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) { - Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null"); - - String groupId = groupInfo.getInlongGroupId(); - log.info("begin to delete kafka resource for groupId={}", groupId); - ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA); - try { - List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); - if (streamInfoList == null || streamInfoList.isEmpty()) { - log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId); - return; - } - for (InlongStreamBriefInfo streamInfo : streamInfoList) { - this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId()); - } - } catch (Exception e) { - log.error("failed to delete kafka resource for groupId=" + groupId, e); - throw new WorkflowListenerException("failed to delete kafka resource: " + e.getMessage()); - } - log.info("success to delete kafka resource for groupId={}, cluster={}", groupId, clusterInfo); + log.info("skip to delete kafka topic for groupId={}", groupInfo.getInlongGroupId()); Review Comment: why skip delete kafka topic directly? ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java: ########## @@ -81,6 +82,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc } InlongGroupInfo groupInfo = form.getGroupInfo(); List<InlongStreamInfo> streamInfos = form.getStreamInfos(); + if (CollectionUtils.isEmpty(streamInfos)) { + LOGGER.warn("not build sort config for groupId={}, as the stream is empty", groupId); Review Comment: ```suggestion LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId); ``` ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java: ########## @@ -96,4 +128,45 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc return ListenerResult.success("success"); } + private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) { + String groupId = groupInfo.getInlongGroupId(); + log.info("success to start stream process for groupId={}", groupId); + + for (InlongStreamInfo stream : streamInfos) { + StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT); + String streamId = stream.getInlongStreamId(); + final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId; + + CompletableFuture<WorkflowResult> future = CompletableFuture + .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE) + .whenComplete((result, ex) -> { + if (ex != null) { + log.error(errMsg + ": " + ex.getMessage()); + throw new WorkflowListenerException(errMsg, ex); + } else { + List<TaskResponse> tasks = result.getNewTasks(); + if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) { + log.error(errMsg); + throw new WorkflowListenerException(errMsg); + } + } + }); + try { + future.get(180, TimeUnit.SECONDS); + /*WorkflowResult result = future.get(180, TimeUnit.SECONDS); + List<TaskResponse> tasks = result.getNewTasks(); + if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) { + log.error(errMsg); + throw new WorkflowListenerException(errMsg); + }*/ Review Comment: plsase remove these commented code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org