This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ff1af8c1a3db817eaa01c41356b2f11ee033cde2
Author: haifxu <xhf1208357...@gmail.com>
AuthorDate: Thu Sep 21 12:52:14 2023 +0800

    [INLONG-8946][Manager] Optimize the audit ID method issued by the manager 
(#8947)
---
 .../service/core/impl/AuditServiceImpl.java        | 32 ++++++++++++++--------
 .../resource/sort/DefaultSortConfigOperator.java   |  9 +++---
 2 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 36b7cb0043..86e4034c0c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -48,7 +48,6 @@ import org.apache.inlong.manager.service.core.AuditService;
 import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
 import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
 
-import com.google.common.collect.Sets;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ibatis.jdbc.SQL;
@@ -239,6 +238,7 @@ public class AuditServiceImpl implements AuditService {
         // for now, we use the first sink type only.
         // this is temporary behavior before multiple sinks in one stream is 
fully supported.
         String sinkNodeType = null;
+        String sourceNodeType = null;
         Integer sinkId = request.getSinkId();
         StreamSinkEntity sinkEntity = null;
         List<StreamSinkEntity> sinkEntityList = 
sinkEntityMapper.selectByRelatedId(groupId, streamId);
@@ -253,10 +253,22 @@ public class AuditServiceImpl implements AuditService {
             sinkNodeType = sinkEntity.getSinkType();
         }
 
-        Set<String> sinkAuditIds = Sets.newHashSet(getAuditId(sinkNodeType, 
true), getAuditId(sinkNodeType, false));
+        InlongGroupEntity groupEntity = 
inlongGroupMapper.selectByGroupId(groupId);
+        List<StreamSourceEntity> sourceEntityList = 
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
+        if (CollectionUtils.isNotEmpty(sourceEntityList)) {
+            sourceNodeType = sourceEntityList.get(0).getSourceType();
+        }
+
+        Map<String, String> auditIdMap = new HashMap<>();
+        auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType);
+        auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
 
         // properly overwrite audit ids by role and stream config
-        request.setAuditIds(getAuditIds(groupId, streamId, sinkNodeType));
+        request.setAuditIds(getAuditIds(groupId, streamId, null, 
sinkNodeType));
+
+        if 
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
+            request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, 
sinkNodeType));
+        }
 
         List<AuditVO> result = new ArrayList<>();
         AuditQuerySource querySource = 
AuditQuerySource.valueOf(auditQuerySource);
@@ -275,7 +287,7 @@ public class AuditServiceImpl implements AuditService {
                     vo.setDelay(((BigDecimal) 
s.get("totalDelay")).longValue());
                     return vo;
                 }).collect(Collectors.toList());
-                result.add(new AuditVO(auditId, auditSet, 
sinkAuditIds.contains(auditId) ? sinkNodeType : null));
+                result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
             } else if (AuditQuerySource.ELASTICSEARCH == querySource) {
                 String index = String.format("%s_%s", 
request.getStartDate().replaceAll("-", ""), auditId);
                 if (!elasticsearchApi.indexExists(index)) {
@@ -294,8 +306,7 @@ public class AuditServiceImpl implements AuditService {
                             vo.setDelay((long) ((ParsedSum) 
bucket.getAggregations().asList().get(1)).getValue());
                             return vo;
                         }).collect(Collectors.toList());
-                        result.add(new AuditVO(auditId, auditSet,
-                                auditId.equals(getAuditId(sinkNodeType, true)) 
? sinkNodeType : null));
+                        result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
                     }
                 }
             } else if (AuditQuerySource.CLICKHOUSE == querySource) {
@@ -312,8 +323,7 @@ public class AuditServiceImpl implements AuditService {
                         vo.setDelay(resultSet.getLong("total_delay"));
                         auditSet.add(vo);
                     }
-                    result.add(new AuditVO(auditId, auditSet,
-                            auditId.equals(getAuditId(sinkNodeType, true)) ? 
sinkNodeType : null));
+                    result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
                 }
             }
         }
@@ -321,7 +331,7 @@ public class AuditServiceImpl implements AuditService {
         return aggregateByTimeDim(result, request.getTimeStaticsDim());
     }
 
-    private List<String> getAuditIds(String groupId, String streamId, String 
sinkNodeType) {
+    private List<String> getAuditIds(String groupId, String streamId, String 
sourceNodeType, String sinkNodeType) {
         Set<String> auditSet = 
LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN)
                 ? new HashSet<>(auditIdListForAdmin)
                 : new HashSet<>(auditIdListForUser);
@@ -330,10 +340,10 @@ public class AuditServiceImpl implements AuditService {
         if (sinkNodeType == null) {
             auditSet.add(getAuditId(ClusterType.DATAPROXY, true));
         } else {
-            auditSet.add(getAuditId(sinkNodeType, false));
+            auditSet.add(getAuditId(sinkNodeType, true));
             InlongGroupEntity inlongGroup = 
inlongGroupMapper.selectByGroupId(groupId);
             if 
(InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
-                auditSet.add(getAuditId(sinkNodeType, true));
+                auditSet.add(getAuditId(sourceNodeType, false));
             }
         }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 587170e238..52ff3bb5e8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -124,16 +124,15 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
             // build a stream info from the nodes and relations
             List<StreamSource> sources = sourceMap.get(streamId);
             List<StreamSink> sinks = sinkMap.get(streamId);
-            // get audit list by sink type
-            List<String> auditIds = new ArrayList<>();
+
             for (StreamSink sink : sinks) {
-                auditIds.add(auditService.getAuditId(sink.getSinkType(), 
false));
-                auditIds.add(auditService.getAuditId(sink.getSinkType(), 
true));
+                Map<String, Object> properties = sink.getProperties();
+                properties.putIfAbsent("metrics.audit.key", 
auditService.getAuditId(sink.getSinkType(), true));
             }
             for (StreamSource source : sources) {
                 source.setFieldList(inlongStream.getFieldList());
                 Map<String, Object> properties = source.getProperties();
-                properties.putIfAbsent("metrics.audit.key", String.join("&", 
auditIds));
+                properties.putIfAbsent("metrics.audit.key", 
auditService.getAuditId(source.getSourceType(), false));
             }
             List<NodeRelation> relations;
 

Reply via email to