This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a68f4bcd34 [INLONG-9886][Manager] Optimized code DataProxyConfigRepositoryV2 (#9888) a68f4bcd34 is described below commit a68f4bcd34d8bba8cfb1c113debe69311aa242e8 Author: balloon72 <96562725+balloo...@users.noreply.github.com> AuthorDate: Thu Mar 28 20:45:49 2024 +0800 [INLONG-9886][Manager] Optimized code DataProxyConfigRepositoryV2 (#9888) Co-authored-by: hanmo1 <ISFA-9844> --- .../repository/DataProxyConfigRepositoryV2.java | 27 ++++++---------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java index e169efc9b7..f9b5b6aa67 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java @@ -258,10 +258,8 @@ public class DataProxyConfigRepositoryV2 implements IRepository { if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString()) && StringUtils.isNotBlank( cacheCluster.getClusterTags())) { Set<String> clusterTags = Sets.newHashSet(cacheCluster.getClusterTags().split(InlongConstants.COMMA)); - clusterTags.forEach(clusterTag -> { - cacheClusterMap.computeIfAbsent(clusterTag, k -> new HashMap<>()) - .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster); - }); + clusterTags.forEach(clusterTag -> cacheClusterMap.computeIfAbsent(clusterTag, k -> new HashMap<>()) + .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster)); } } // mark cache cluster to proxy cluster @@ -530,9 +528,7 @@ public class DataProxyConfigRepositoryV2 implements IRepository { Map<String, InlongClusterEntity> clusterMap = new HashMap<>(); ClusterPageRequest clusterRequest = new ClusterPageRequest(); List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest); - clusters.forEach((v) -> { - clusterMap.put(v.getName(), v); - }); + clusters.forEach(v -> clusterMap.put(v.getName(), v)); // prepare stream sink SinkPageRequest request = new SinkPageRequest(); request.setInlongGroupId(inlongGroupId); @@ -541,10 +537,7 @@ public class DataProxyConfigRepositoryV2 implements IRepository { for (StreamSinkEntity streamSink : streamSinks) { String clusterName = streamSink.getInlongClusterName(); InlongClusterEntity cluster = clusterMap.get(clusterName); - if (cluster == null) { - continue; - } - if (!StringUtils.equals(oldClusterTag, cluster.getClusterTags())) { + if (cluster == null || !StringUtils.equals(oldClusterTag, cluster.getClusterTags())) { continue; } String clusterType = cluster.getType(); @@ -556,9 +549,7 @@ public class DataProxyConfigRepositoryV2 implements IRepository { } } // update - newStreamSinks.forEach((v) -> { - streamSinkMapper.insert(v); - }); + newStreamSinks.forEach(v -> streamSinkMapper.insert(v)); int rowCount = inlongGroupMapper.updateByIdentifierSelective(newGroup); if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { LOGGER.error("inlong group has already updated with group id={}, curVersion={}", @@ -675,9 +666,7 @@ public class DataProxyConfigRepositoryV2 implements IRepository { Map<String, InlongClusterEntity> clusterMap = new HashMap<>(); ClusterPageRequest clusterRequest = new ClusterPageRequest(); List<InlongClusterEntity> clusters = clusterMapper.selectByCondition(clusterRequest); - clusters.forEach((v) -> { - clusterMap.put(v.getName(), v); - }); + clusters.forEach(v -> clusterMap.put(v.getName(), v)); // prepare stream sink SinkPageRequest request = new SinkPageRequest(); request.setInlongGroupId(inlongGroupId); @@ -694,9 +683,7 @@ public class DataProxyConfigRepositoryV2 implements IRepository { } } // delete old stream sink - deleteStreamSinks.forEach((v) -> { - streamSinkMapper.deleteById(v.getId()); - }); + deleteStreamSinks.forEach(v -> streamSinkMapper.deleteById(v.getId())); return inlongGroupId; } }