This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a623662b [INLONG-9069][Manager] Filter out invalid configs when 
organize SortStandalone configuration (#9072)
40a623662b is described below

commit 40a623662b9bbfd68770a06aed2f7be389f8e1a3
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed Oct 18 16:37:05 2023 +0800

    [INLONG-9069][Manager] Filter out invalid configs when organize 
SortStandalone configuration (#9072)
    
    * [INLONG-9069][Manager] Filter out invalid configs when organize 
SortStandalone configuration
    
    * fix
---
 .../inlong/manager/service/core/impl/SortSourceServiceImpl.java       | 4 ++++
 1 file changed, 4 insertions(+)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index c9fb4937ee..530ea89697 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -207,6 +207,7 @@ public class SortSourceServiceImpl implements 
SortSourceService {
         streamSinkMap = new HashMap<>();
         allStreamSinks.stream()
                 .filter(sink -> 
StringUtils.isNotBlank(sink.getSortClusterName()))
+                .filter(sink -> 
Objects.nonNull(sortClusters.get(sink.getSortClusterName())))
                 .filter(sink -> StringUtils.isNotBlank(sink.getSortTaskName()))
                 .forEach(sink -> {
                     Map<String, List<SortSourceStreamSinkInfo>> task2groupsMap 
=
@@ -219,6 +220,8 @@ public class SortSourceServiceImpl implements 
SortSourceService {
         // reload all groups
         groupInfos = configLoader.loadAllGroup()
                 .stream()
+                .filter(group -> StringUtils.isNotBlank(group.getMqResource()))
+                .filter(group -> StringUtils.isNotBlank(group.getClusterTag()))
                 .collect(Collectors.toMap(SortSourceGroupInfo::getGroupId, 
info -> info));
 
         // reload all back up cluster
@@ -234,6 +237,7 @@ public class SortSourceServiceImpl implements 
SortSourceService {
         // reload all streams
         allStreams = configLoader.loadAllStreams()
                 .stream()
+                .filter(stream -> 
StringUtils.isNotBlank(stream.getMqResource()))
                 
.collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
                         
Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
 

Reply via email to