This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e2f403d833 [INLONG-9351][Manager] Support querying audit data size 
(#9352)
e2f403d833 is described below

commit e2f403d833e296d33270e3f5f6d99d1b5a3dab34
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Mon Dec 4 20:18:09 2023 +0800

    [INLONG-9351][Manager] Support querying audit data size (#9352)
---
 .../main/resources/mappers/AuditEntityMapper.xml   |  2 +-
 .../inlong/manager/pojo/audit/AuditInfo.java       |  5 ++++-
 .../service/core/impl/AuditServiceImpl.java        | 26 +++++++++++++++-------
 .../service/core/impl/AuditServiceTest.java        |  4 ++--
 4 files changed, 25 insertions(+), 12 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index 5722857ffa..8c093ebc4b 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -42,7 +42,7 @@
     </resultMap>
 
     <select id="sumByLogTs" resultMap="SumByLogTsResultMap">
-        select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, 
sum(`count`) as total, sum(`delay`) as total_delay
+        select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, 
sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size
         from (
             select distinct ip, docker_id, thread_id, sdk_ts, packet_id, 
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
             from apache_inlong_audit.audit_data
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
index 7a13088885..a1ac46f9e6 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
@@ -32,13 +32,16 @@ public class AuditInfo {
     private long count;
     @ApiModelProperty(value = "Audit delay")
     private long delay;
+    @ApiModelProperty(value = "Audit size")
+    private long size;
 
     public AuditInfo() {
     }
 
-    public AuditInfo(String logTs, long count, long delay) {
+    public AuditInfo(String logTs, long count, long delay, long size) {
         this.logTs = logTs;
         this.count = count;
         this.delay = delay;
+        this.size = size;
     }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 0d8cf1cc94..7ffdafb3b9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -262,13 +262,15 @@ public class AuditServiceImpl implements AuditService {
         Map<String, String> auditIdMap = new HashMap<>();
         auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
 
-        // properly overwrite audit ids by role and stream config
-        if 
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
-            auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType);
-            request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, 
sinkNodeType));
-        } else {
-            auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType);
-            request.setAuditIds(getAuditIds(groupId, streamId, null, 
sinkNodeType));
+        if (CollectionUtils.isEmpty(request.getAuditIds())) {
+            // properly overwrite audit ids by role and stream config
+            if 
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
+                auditIdMap.put(getAuditId(sourceNodeType, false), 
sourceNodeType);
+                request.setAuditIds(getAuditIds(groupId, streamId, 
sourceNodeType, sinkNodeType));
+            } else {
+                auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType);
+                request.setAuditIds(getAuditIds(groupId, streamId, null, 
sinkNodeType));
+            }
         }
 
         List<AuditVO> result = new ArrayList<>();
@@ -286,6 +288,7 @@ public class AuditServiceImpl implements AuditService {
                     vo.setLogTs((String) s.get("logTs"));
                     vo.setCount(((BigDecimal) s.get("total")).longValue());
                     vo.setDelay(((BigDecimal) 
s.get("totalDelay")).longValue());
+                    vo.setSize(((BigDecimal) s.get("totalSize")).longValue());
                     return vo;
                 }).collect(Collectors.toList());
                 result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
@@ -322,6 +325,7 @@ public class AuditServiceImpl implements AuditService {
                         vo.setLogTs(resultSet.getString("log_ts"));
                         vo.setCount(resultSet.getLong("total"));
                         vo.setDelay(resultSet.getLong("total_delay"));
+                        vo.setSize(resultSet.getLong("total_size"));
                         auditSet.add(vo);
                     }
                     result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
@@ -416,7 +420,7 @@ public class AuditServiceImpl implements AuditService {
                 .toString();
 
         String sql = new SQL()
-                .SELECT("log_ts", "sum(count) as total", "sum(delay) as 
total_delay")
+                .SELECT("log_ts", "sum(count) as total", "sum(delay) as 
total_delay", "sum(size) as total_size")
                 .FROM("(" + subQuery + ") as sub")
                 .GROUP_BY("log_ts")
                 .ORDER_BY("log_ts")
@@ -459,6 +463,7 @@ public class AuditServiceImpl implements AuditService {
             AuditVO statInfo = new AuditVO();
             HashMap<String, AtomicLong> countMap = new HashMap<>();
             HashMap<String, AtomicLong> delayMap = new HashMap<>();
+            HashMap<String, AtomicLong> sizeMap = new HashMap<>();
             statInfo.setAuditId(auditVO.getAuditId());
             statInfo.setNodeType(auditVO.getNodeType());
             for (AuditInfo auditInfo : auditVO.getAuditSet()) {
@@ -472,8 +477,12 @@ public class AuditServiceImpl implements AuditService {
                 if (delayMap.get(statKey) == null) {
                     delayMap.put(statKey, new AtomicLong(0));
                 }
+                if (sizeMap.get(statKey) == null) {
+                    sizeMap.put(statKey, new AtomicLong(0));
+                }
                 countMap.get(statKey).addAndGet(auditInfo.getCount());
                 delayMap.get(statKey).addAndGet(auditInfo.getDelay());
+                sizeMap.get(statKey).addAndGet(auditInfo.getSize());
             }
 
             List<AuditInfo> auditInfoList = new LinkedList<>();
@@ -483,6 +492,7 @@ public class AuditServiceImpl implements AuditService {
                 long count = entry.getValue().get();
                 auditInfoStat.setCount(entry.getValue().get());
                 auditInfoStat.setDelay(count == 0 ? 0 : 
delayMap.get(entry.getKey()).get() / count);
+                auditInfoStat.setSize(count == 0 ? 0 : 
sizeMap.get(entry.getKey()).get() / count);
                 auditInfoList.add(auditInfoStat);
             }
             statInfo.setAuditSet(auditInfoList);
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
index 7909b0e012..28b8636969 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
@@ -52,8 +52,8 @@ class AuditServiceTest extends ServiceBaseTest {
         List<AuditVO> result = new ArrayList<>();
         AuditVO auditVO = new AuditVO();
         auditVO.setAuditId("3");
-        auditVO.setAuditSet(Arrays.asList(new AuditInfo("2022-01-01 00:00:00", 
123L, 12L),
-                new AuditInfo("2022-01-01 00:01:00", 124L, 12L)));
+        auditVO.setAuditSet(Arrays.asList(new AuditInfo("2022-01-01 00:00:00", 
123L, 12L, 12L),
+                new AuditInfo("2022-01-01 00:01:00", 124L, 12L, 12L)));
         result.add(auditVO);
         Assertions.assertNotNull(result);
         // close real test for testQueryFromMySQL due to date_format function 
not support in h2

Reply via email to