This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 40a623662b [INLONG-9069][Manager] Filter out invalid configs when organize SortStandalone configuration (#9072) 40a623662b is described below commit 40a623662b9bbfd68770a06aed2f7be389f8e1a3 Author: vernedeng <verned...@apache.org> AuthorDate: Wed Oct 18 16:37:05 2023 +0800 [INLONG-9069][Manager] Filter out invalid configs when organize SortStandalone configuration (#9072) * [INLONG-9069][Manager] Filter out invalid configs when organize SortStandalone configuration * fix --- .../inlong/manager/service/core/impl/SortSourceServiceImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java index c9fb4937ee..530ea89697 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java @@ -207,6 +207,7 @@ public class SortSourceServiceImpl implements SortSourceService { streamSinkMap = new HashMap<>(); allStreamSinks.stream() .filter(sink -> StringUtils.isNotBlank(sink.getSortClusterName())) + .filter(sink -> Objects.nonNull(sortClusters.get(sink.getSortClusterName()))) .filter(sink -> StringUtils.isNotBlank(sink.getSortTaskName())) .forEach(sink -> { Map<String, List<SortSourceStreamSinkInfo>> task2groupsMap = @@ -219,6 +220,8 @@ public class SortSourceServiceImpl implements SortSourceService { // reload all groups groupInfos = configLoader.loadAllGroup() .stream() + .filter(group -> StringUtils.isNotBlank(group.getMqResource())) + .filter(group -> StringUtils.isNotBlank(group.getClusterTag())) .collect(Collectors.toMap(SortSourceGroupInfo::getGroupId, info -> info)); // reload all back up cluster @@ -234,6 +237,7 @@ public class SortSourceServiceImpl implements SortSourceService { // reload all streams allStreams = configLoader.loadAllStreams() .stream() + .filter(stream -> StringUtils.isNotBlank(stream.getMqResource())) .collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId, Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));