This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new de894ed9d [INLONG-6827][Manager] Optimize the config managerment of 
SortStandalone (#6828)
de894ed9d is described below

commit de894ed9d56720052b66721bbe421e1bc0b263ae
Author: vernedeng <deng...@pku.edu.cn>
AuthorDate: Wed Dec 14 12:39:05 2022 +0800

    [INLONG-6827][Manager] Optimize the config managerment of SortStandalone 
(#6828)
---
 .../service/core/impl/SortClusterServiceImpl.java  | 69 ++++++++++++++--------
 1 file changed, 43 insertions(+), 26 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index e55aa3ea8..f3541316f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -107,9 +108,9 @@ public class SortClusterServiceImpl implements 
SortClusterService {
     public void reload() {
         LOGGER.debug("start to reload sort config");
         try {
-            reloadAllClusterConfigV2();
+            reloadAllClusterConfig();
         } catch (Throwable t) {
-            LOGGER.error(t.getMessage(), t);
+            LOGGER.error("fail to reload cluster config", t);
         }
         LOGGER.debug("end to reload config");
     }
@@ -161,7 +162,7 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                 .build();
     }
 
-    private void reloadAllClusterConfigV2() {
+    private void reloadAllClusterConfig() {
         // load all fields info
         List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
         fieldMap = new HashMap<>();
@@ -174,13 +175,16 @@ public class SortClusterServiceImpl implements 
SortClusterService {
         // get all task under a given cluster, has been reduced into cluster 
and task.
         List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask();
         Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
-                .filter(dto -> dto.getSortClusterName() != null)
+                .filter(dto -> StringUtils.isNotBlank(dto.getSortClusterName())
+                        && StringUtils.isNotBlank(dto.getSortTaskName())
+                        && StringUtils.isNotBlank(dto.getDataNodeName())
+                        && StringUtils.isNotBlank(dto.getSinkType()))
                 
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
 
         // get all stream sinks
         Map<String, List<StreamSinkEntity>> task2AllStreams = 
sinkEntities.stream()
                 .filter(entity -> 
StringUtils.isNotBlank(entity.getInlongClusterName()))
-                .collect(Collectors.groupingBy(StreamSinkEntity::getSinkName));
+                
.collect(Collectors.groupingBy(StreamSinkEntity::getSortTaskName));
 
         // get all data nodes and group by node name
         List<DataNodeEntity> dataNodeEntities = 
sortConfigLoader.loadAllDataNodeEntity();
@@ -199,7 +203,7 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
         clusterTaskMap.forEach((clusterName, taskList) -> {
             try {
-                SortClusterConfig config = 
this.getConfigByClusterNameV2(clusterName,
+                SortClusterConfig config = 
this.getConfigByClusterName(clusterName,
                         taskList, task2AllStreams, task2DataNodeMap);
                 String jsonStr = GSON.toJson(config);
                 String md5 = DigestUtils.md5Hex(jsonStr);
@@ -207,8 +211,9 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                 newMd5Map.put(clusterName, md5);
             } catch (Throwable e) {
                 // if get config failed, update the err log.
-                newErrorLogMap.put(clusterName, e.getMessage());
-                LOGGER.error("Failed to update cluster config={}, error={}", 
clusterName, e.getMessage());
+                String errMsg = 
Optional.ofNullable(e.getMessage()).orElse("Unknown error, please check logs");
+                newErrorLogMap.put(clusterName, errMsg);
+                LOGGER.error("Failed to update cluster config={}", 
clusterName, e);
             }
         });
 
@@ -217,7 +222,7 @@ public class SortClusterServiceImpl implements 
SortClusterService {
         sortClusterMd5Map = newMd5Map;
     }
 
-    private SortClusterConfig getConfigByClusterNameV2(
+    private SortClusterConfig getConfigByClusterName(
             String clusterName,
             List<SortTaskInfo> tasks,
             Map<String, List<StreamSinkEntity>> task2AllStreams,
@@ -225,19 +230,24 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
         List<SortTaskConfig> taskConfigs = tasks.stream()
                 .map(task -> {
-                    String taskName = task.getSortTaskName();
-                    String type = task.getSinkType();
-                    String dataNodeName = task.getDataNodeName();
-                    DataNodeInfo nodeInfo = task2DataNodeMap.get(dataNodeName);
-                    List<StreamSinkEntity> streams = 
task2AllStreams.get(taskName);
-
-                    return SortTaskConfig.builder()
-                            .name(taskName)
-                            .type(type)
-                            .idParams(this.parseIdParamsV2(streams))
-                            .sinkParams(this.parseSinkParamsV2(nodeInfo))
-                            .build();
+                    try {
+                        String taskName = task.getSortTaskName();
+                        String type = task.getSinkType();
+                        String dataNodeName = task.getDataNodeName();
+                        DataNodeInfo nodeInfo = 
task2DataNodeMap.get(dataNodeName);
+                        List<StreamSinkEntity> streams = 
task2AllStreams.get(taskName);
+                        return SortTaskConfig.builder()
+                                .name(taskName)
+                                .type(type)
+                                .idParams(this.parseIdParams(streams))
+                                .sinkParams(this.parseSinkParams(nodeInfo))
+                                .build();
+                    } catch (Exception e) {
+                        LOGGER.error("fail to parse sort task config of 
cluster={}", clusterName, e);
+                        return null;
+                    }
                 })
+                .filter(Objects::nonNull)
                 .collect(Collectors.toList());
 
         return SortClusterConfig.builder()
@@ -246,18 +256,25 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                 .build();
     }
 
-    private List<Map<String, String>> parseIdParamsV2(List<StreamSinkEntity> 
streams) {
+    private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> 
streams) {
         return streams.stream()
                 .map(streamSink -> {
-                    StreamSinkOperator operator = 
sinkOperatorFactory.getInstance(streamSink.getSinkType());
-                    List<String> fields = 
fieldMap.get(streamSink.getInlongGroupId());
-                    return operator.parse2IdParams(streamSink, fields);
+                    try {
+                        StreamSinkOperator operator = 
sinkOperatorFactory.getInstance(streamSink.getSinkType());
+                        List<String> fields = 
fieldMap.get(streamSink.getInlongGroupId());
+                        return operator.parse2IdParams(streamSink, fields);
+                    } catch (Exception e) {
+                        LOGGER.error("fail to parse id params of groupId={}, 
streamId={} name={}, type={}}",
+                                streamSink.getInlongGroupId(), 
streamSink.getInlongStreamId(),
+                                streamSink.getSinkName(), 
streamSink.getSinkType(), e);
+                        return null;
+                    }
                 })
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
 
-    private Map<String, String> parseSinkParamsV2(DataNodeInfo nodeInfo) {
+    private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
         DataNodeOperator operator = 
dataNodeOperatorFactory.getInstance(nodeInfo.getType());
         return operator.parse2SinkParams(nodeInfo);
     }

Reply via email to