This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch branch-1.9 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ff1af8c1a3db817eaa01c41356b2f11ee033cde2 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Thu Sep 21 12:52:14 2023 +0800 [INLONG-8946][Manager] Optimize the audit ID method issued by the manager (#8947) --- .../service/core/impl/AuditServiceImpl.java | 32 ++++++++++++++-------- .../resource/sort/DefaultSortConfigOperator.java | 9 +++--- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 36b7cb0043..86e4034c0c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -48,7 +48,6 @@ import org.apache.inlong.manager.service.core.AuditService; import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig; import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi; -import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.jdbc.SQL; @@ -239,6 +238,7 @@ public class AuditServiceImpl implements AuditService { // for now, we use the first sink type only. // this is temporary behavior before multiple sinks in one stream is fully supported. String sinkNodeType = null; + String sourceNodeType = null; Integer sinkId = request.getSinkId(); StreamSinkEntity sinkEntity = null; List<StreamSinkEntity> sinkEntityList = sinkEntityMapper.selectByRelatedId(groupId, streamId); @@ -253,10 +253,22 @@ public class AuditServiceImpl implements AuditService { sinkNodeType = sinkEntity.getSinkType(); } - Set<String> sinkAuditIds = Sets.newHashSet(getAuditId(sinkNodeType, true), getAuditId(sinkNodeType, false)); + InlongGroupEntity groupEntity = inlongGroupMapper.selectByGroupId(groupId); + List<StreamSourceEntity> sourceEntityList = sourceEntityMapper.selectByRelatedId(groupId, streamId, null); + if (CollectionUtils.isNotEmpty(sourceEntityList)) { + sourceNodeType = sourceEntityList.get(0).getSourceType(); + } + + Map<String, String> auditIdMap = new HashMap<>(); + auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); + auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType); // properly overwrite audit ids by role and stream config - request.setAuditIds(getAuditIds(groupId, streamId, sinkNodeType)); + request.setAuditIds(getAuditIds(groupId, streamId, null, sinkNodeType)); + + if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) { + request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); + } List<AuditVO> result = new ArrayList<>(); AuditQuerySource querySource = AuditQuerySource.valueOf(auditQuerySource); @@ -275,7 +287,7 @@ public class AuditServiceImpl implements AuditService { vo.setDelay(((BigDecimal) s.get("totalDelay")).longValue()); return vo; }).collect(Collectors.toList()); - result.add(new AuditVO(auditId, auditSet, sinkAuditIds.contains(auditId) ? sinkNodeType : null)); + result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); } else if (AuditQuerySource.ELASTICSEARCH == querySource) { String index = String.format("%s_%s", request.getStartDate().replaceAll("-", ""), auditId); if (!elasticsearchApi.indexExists(index)) { @@ -294,8 +306,7 @@ public class AuditServiceImpl implements AuditService { vo.setDelay((long) ((ParsedSum) bucket.getAggregations().asList().get(1)).getValue()); return vo; }).collect(Collectors.toList()); - result.add(new AuditVO(auditId, auditSet, - auditId.equals(getAuditId(sinkNodeType, true)) ? sinkNodeType : null)); + result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); } } } else if (AuditQuerySource.CLICKHOUSE == querySource) { @@ -312,8 +323,7 @@ public class AuditServiceImpl implements AuditService { vo.setDelay(resultSet.getLong("total_delay")); auditSet.add(vo); } - result.add(new AuditVO(auditId, auditSet, - auditId.equals(getAuditId(sinkNodeType, true)) ? sinkNodeType : null)); + result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); } } } @@ -321,7 +331,7 @@ public class AuditServiceImpl implements AuditService { return aggregateByTimeDim(result, request.getTimeStaticsDim()); } - private List<String> getAuditIds(String groupId, String streamId, String sinkNodeType) { + private List<String> getAuditIds(String groupId, String streamId, String sourceNodeType, String sinkNodeType) { Set<String> auditSet = LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN) ? new HashSet<>(auditIdListForAdmin) : new HashSet<>(auditIdListForUser); @@ -330,10 +340,10 @@ public class AuditServiceImpl implements AuditService { if (sinkNodeType == null) { auditSet.add(getAuditId(ClusterType.DATAPROXY, true)); } else { - auditSet.add(getAuditId(sinkNodeType, false)); + auditSet.add(getAuditId(sinkNodeType, true)); InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId); if (InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) { - auditSet.add(getAuditId(sinkNodeType, true)); + auditSet.add(getAuditId(sourceNodeType, false)); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 587170e238..52ff3bb5e8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -124,16 +124,15 @@ public class DefaultSortConfigOperator implements SortConfigOperator { // build a stream info from the nodes and relations List<StreamSource> sources = sourceMap.get(streamId); List<StreamSink> sinks = sinkMap.get(streamId); - // get audit list by sink type - List<String> auditIds = new ArrayList<>(); + for (StreamSink sink : sinks) { - auditIds.add(auditService.getAuditId(sink.getSinkType(), false)); - auditIds.add(auditService.getAuditId(sink.getSinkType(), true)); + Map<String, Object> properties = sink.getProperties(); + properties.putIfAbsent("metrics.audit.key", auditService.getAuditId(sink.getSinkType(), true)); } for (StreamSource source : sources) { source.setFieldList(inlongStream.getFieldList()); Map<String, Object> properties = source.getProperties(); - properties.putIfAbsent("metrics.audit.key", String.join("&", auditIds)); + properties.putIfAbsent("metrics.audit.key", auditService.getAuditId(source.getSourceType(), false)); } List<NodeRelation> relations;