This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new de894ed9d [INLONG-6827][Manager] Optimize the config managerment of SortStandalone (#6828) de894ed9d is described below commit de894ed9d56720052b66721bbe421e1bc0b263ae Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Wed Dec 14 12:39:05 2022 +0800 [INLONG-6827][Manager] Optimize the config managerment of SortStandalone (#6828) --- .../service/core/impl/SortClusterServiceImpl.java | 69 ++++++++++++++-------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java index e55aa3ea8..f3541316f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -107,9 +108,9 @@ public class SortClusterServiceImpl implements SortClusterService { public void reload() { LOGGER.debug("start to reload sort config"); try { - reloadAllClusterConfigV2(); + reloadAllClusterConfig(); } catch (Throwable t) { - LOGGER.error(t.getMessage(), t); + LOGGER.error("fail to reload cluster config", t); } LOGGER.debug("end to reload config"); } @@ -161,7 +162,7 @@ public class SortClusterServiceImpl implements SortClusterService { .build(); } - private void reloadAllClusterConfigV2() { + private void reloadAllClusterConfig() { // load all fields info List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields(); fieldMap = new HashMap<>(); @@ -174,13 +175,16 @@ public class SortClusterServiceImpl implements SortClusterService { // get all task under a given cluster, has been reduced into cluster and task. List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask(); Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream() - .filter(dto -> dto.getSortClusterName() != null) + .filter(dto -> StringUtils.isNotBlank(dto.getSortClusterName()) + && StringUtils.isNotBlank(dto.getSortTaskName()) + && StringUtils.isNotBlank(dto.getDataNodeName()) + && StringUtils.isNotBlank(dto.getSinkType())) .collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName)); // get all stream sinks Map<String, List<StreamSinkEntity>> task2AllStreams = sinkEntities.stream() .filter(entity -> StringUtils.isNotBlank(entity.getInlongClusterName())) - .collect(Collectors.groupingBy(StreamSinkEntity::getSinkName)); + .collect(Collectors.groupingBy(StreamSinkEntity::getSortTaskName)); // get all data nodes and group by node name List<DataNodeEntity> dataNodeEntities = sortConfigLoader.loadAllDataNodeEntity(); @@ -199,7 +203,7 @@ public class SortClusterServiceImpl implements SortClusterService { clusterTaskMap.forEach((clusterName, taskList) -> { try { - SortClusterConfig config = this.getConfigByClusterNameV2(clusterName, + SortClusterConfig config = this.getConfigByClusterName(clusterName, taskList, task2AllStreams, task2DataNodeMap); String jsonStr = GSON.toJson(config); String md5 = DigestUtils.md5Hex(jsonStr); @@ -207,8 +211,9 @@ public class SortClusterServiceImpl implements SortClusterService { newMd5Map.put(clusterName, md5); } catch (Throwable e) { // if get config failed, update the err log. - newErrorLogMap.put(clusterName, e.getMessage()); - LOGGER.error("Failed to update cluster config={}, error={}", clusterName, e.getMessage()); + String errMsg = Optional.ofNullable(e.getMessage()).orElse("Unknown error, please check logs"); + newErrorLogMap.put(clusterName, errMsg); + LOGGER.error("Failed to update cluster config={}", clusterName, e); } }); @@ -217,7 +222,7 @@ public class SortClusterServiceImpl implements SortClusterService { sortClusterMd5Map = newMd5Map; } - private SortClusterConfig getConfigByClusterNameV2( + private SortClusterConfig getConfigByClusterName( String clusterName, List<SortTaskInfo> tasks, Map<String, List<StreamSinkEntity>> task2AllStreams, @@ -225,19 +230,24 @@ public class SortClusterServiceImpl implements SortClusterService { List<SortTaskConfig> taskConfigs = tasks.stream() .map(task -> { - String taskName = task.getSortTaskName(); - String type = task.getSinkType(); - String dataNodeName = task.getDataNodeName(); - DataNodeInfo nodeInfo = task2DataNodeMap.get(dataNodeName); - List<StreamSinkEntity> streams = task2AllStreams.get(taskName); - - return SortTaskConfig.builder() - .name(taskName) - .type(type) - .idParams(this.parseIdParamsV2(streams)) - .sinkParams(this.parseSinkParamsV2(nodeInfo)) - .build(); + try { + String taskName = task.getSortTaskName(); + String type = task.getSinkType(); + String dataNodeName = task.getDataNodeName(); + DataNodeInfo nodeInfo = task2DataNodeMap.get(dataNodeName); + List<StreamSinkEntity> streams = task2AllStreams.get(taskName); + return SortTaskConfig.builder() + .name(taskName) + .type(type) + .idParams(this.parseIdParams(streams)) + .sinkParams(this.parseSinkParams(nodeInfo)) + .build(); + } catch (Exception e) { + LOGGER.error("fail to parse sort task config of cluster={}", clusterName, e); + return null; + } }) + .filter(Objects::nonNull) .collect(Collectors.toList()); return SortClusterConfig.builder() @@ -246,18 +256,25 @@ public class SortClusterServiceImpl implements SortClusterService { .build(); } - private List<Map<String, String>> parseIdParamsV2(List<StreamSinkEntity> streams) { + private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> streams) { return streams.stream() .map(streamSink -> { - StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType()); - List<String> fields = fieldMap.get(streamSink.getInlongGroupId()); - return operator.parse2IdParams(streamSink, fields); + try { + StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType()); + List<String> fields = fieldMap.get(streamSink.getInlongGroupId()); + return operator.parse2IdParams(streamSink, fields); + } catch (Exception e) { + LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}", + streamSink.getInlongGroupId(), streamSink.getInlongStreamId(), + streamSink.getSinkName(), streamSink.getSinkType(), e); + return null; + } }) .filter(Objects::nonNull) .collect(Collectors.toList()); } - private Map<String, String> parseSinkParamsV2(DataNodeInfo nodeInfo) { + private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) { DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType()); return operator.parse2SinkParams(nodeInfo); }