This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a68f4bcd34 [INLONG-9886][Manager] Optimized code 
DataProxyConfigRepositoryV2 (#9888)
a68f4bcd34 is described below

commit a68f4bcd34d8bba8cfb1c113debe69311aa242e8
Author: balloon72 <96562725+balloo...@users.noreply.github.com>
AuthorDate: Thu Mar 28 20:45:49 2024 +0800

    [INLONG-9886][Manager] Optimized code DataProxyConfigRepositoryV2 (#9888)
    
    Co-authored-by: hanmo1 <ISFA-9844>
---
 .../repository/DataProxyConfigRepositoryV2.java    | 27 ++++++----------------
 1 file changed, 7 insertions(+), 20 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
index e169efc9b7..f9b5b6aa67 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryV2.java
@@ -258,10 +258,8 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
             if (StringUtils.equalsIgnoreCase(producerTag, 
Boolean.TRUE.toString()) && StringUtils.isNotBlank(
                     cacheCluster.getClusterTags())) {
                 Set<String> clusterTags = 
Sets.newHashSet(cacheCluster.getClusterTags().split(InlongConstants.COMMA));
-                clusterTags.forEach(clusterTag -> {
-                    cacheClusterMap.computeIfAbsent(clusterTag, k -> new 
HashMap<>())
-                            .computeIfAbsent(cacheCluster.getExtTag(), k -> 
new ArrayList<>()).add(cacheCluster);
-                });
+                clusterTags.forEach(clusterTag -> 
cacheClusterMap.computeIfAbsent(clusterTag, k -> new HashMap<>())
+                        .computeIfAbsent(cacheCluster.getExtTag(), k -> new 
ArrayList<>()).add(cacheCluster));
             }
         }
         // mark cache cluster to proxy cluster
@@ -530,9 +528,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
             Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
             ClusterPageRequest clusterRequest = new ClusterPageRequest();
             List<InlongClusterEntity> clusters = 
clusterMapper.selectByCondition(clusterRequest);
-            clusters.forEach((v) -> {
-                clusterMap.put(v.getName(), v);
-            });
+            clusters.forEach(v -> clusterMap.put(v.getName(), v));
             // prepare stream sink
             SinkPageRequest request = new SinkPageRequest();
             request.setInlongGroupId(inlongGroupId);
@@ -541,10 +537,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
             for (StreamSinkEntity streamSink : streamSinks) {
                 String clusterName = streamSink.getInlongClusterName();
                 InlongClusterEntity cluster = clusterMap.get(clusterName);
-                if (cluster == null) {
-                    continue;
-                }
-                if (!StringUtils.equals(oldClusterTag, 
cluster.getClusterTags())) {
+                if (cluster == null || !StringUtils.equals(oldClusterTag, 
cluster.getClusterTags())) {
                     continue;
                 }
                 String clusterType = cluster.getType();
@@ -556,9 +549,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
                 }
             }
             // update
-            newStreamSinks.forEach((v) -> {
-                streamSinkMapper.insert(v);
-            });
+            newStreamSinks.forEach(v -> streamSinkMapper.insert(v));
             int rowCount = 
inlongGroupMapper.updateByIdentifierSelective(newGroup);
             if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
                 LOGGER.error("inlong group has already updated with group 
id={}, curVersion={}",
@@ -675,9 +666,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
         Map<String, InlongClusterEntity> clusterMap = new HashMap<>();
         ClusterPageRequest clusterRequest = new ClusterPageRequest();
         List<InlongClusterEntity> clusters = 
clusterMapper.selectByCondition(clusterRequest);
-        clusters.forEach((v) -> {
-            clusterMap.put(v.getName(), v);
-        });
+        clusters.forEach(v -> clusterMap.put(v.getName(), v));
         // prepare stream sink
         SinkPageRequest request = new SinkPageRequest();
         request.setInlongGroupId(inlongGroupId);
@@ -694,9 +683,7 @@ public class DataProxyConfigRepositoryV2 implements 
IRepository {
             }
         }
         // delete old stream sink
-        deleteStreamSinks.forEach((v) -> {
-            streamSinkMapper.deleteById(v.getId());
-        });
+        deleteStreamSinks.forEach(v -> streamSinkMapper.deleteById(v.getId()));
         return inlongGroupId;
     }
 }

Reply via email to