This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e2f403d833 [INLONG-9351][Manager] Support querying audit data size (#9352) e2f403d833 is described below commit e2f403d833e296d33270e3f5f6d99d1b5a3dab34 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Mon Dec 4 20:18:09 2023 +0800 [INLONG-9351][Manager] Support querying audit data size (#9352) --- .../main/resources/mappers/AuditEntityMapper.xml | 2 +- .../inlong/manager/pojo/audit/AuditInfo.java | 5 ++++- .../service/core/impl/AuditServiceImpl.java | 26 +++++++++++++++------- .../service/core/impl/AuditServiceTest.java | 4 ++-- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml index 5722857ffa..8c093ebc4b 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml @@ -42,7 +42,7 @@ </resultMap> <select id="sumByLogTs" resultMap="SumByLogTsResultMap"> - select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as total_delay + select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size from ( select distinct ip, docker_id, thread_id, sdk_ts, packet_id, log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay` from apache_inlong_audit.audit_data diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java index 7a13088885..a1ac46f9e6 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java @@ -32,13 +32,16 @@ public class AuditInfo { private long count; @ApiModelProperty(value = "Audit delay") private long delay; + @ApiModelProperty(value = "Audit size") + private long size; public AuditInfo() { } - public AuditInfo(String logTs, long count, long delay) { + public AuditInfo(String logTs, long count, long delay, long size) { this.logTs = logTs; this.count = count; this.delay = delay; + this.size = size; } } \ No newline at end of file diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 0d8cf1cc94..7ffdafb3b9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -262,13 +262,15 @@ public class AuditServiceImpl implements AuditService { Map<String, String> auditIdMap = new HashMap<>(); auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType); - // properly overwrite audit ids by role and stream config - if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) { - auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); - request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); - } else { - auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType); - request.setAuditIds(getAuditIds(groupId, streamId, null, sinkNodeType)); + if (CollectionUtils.isEmpty(request.getAuditIds())) { + // properly overwrite audit ids by role and stream config + if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) { + auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); + request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); + } else { + auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType); + request.setAuditIds(getAuditIds(groupId, streamId, null, sinkNodeType)); + } } List<AuditVO> result = new ArrayList<>(); @@ -286,6 +288,7 @@ public class AuditServiceImpl implements AuditService { vo.setLogTs((String) s.get("logTs")); vo.setCount(((BigDecimal) s.get("total")).longValue()); vo.setDelay(((BigDecimal) s.get("totalDelay")).longValue()); + vo.setSize(((BigDecimal) s.get("totalSize")).longValue()); return vo; }).collect(Collectors.toList()); result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); @@ -322,6 +325,7 @@ public class AuditServiceImpl implements AuditService { vo.setLogTs(resultSet.getString("log_ts")); vo.setCount(resultSet.getLong("total")); vo.setDelay(resultSet.getLong("total_delay")); + vo.setSize(resultSet.getLong("total_size")); auditSet.add(vo); } result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); @@ -416,7 +420,7 @@ public class AuditServiceImpl implements AuditService { .toString(); String sql = new SQL() - .SELECT("log_ts", "sum(count) as total", "sum(delay) as total_delay") + .SELECT("log_ts", "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size") .FROM("(" + subQuery + ") as sub") .GROUP_BY("log_ts") .ORDER_BY("log_ts") @@ -459,6 +463,7 @@ public class AuditServiceImpl implements AuditService { AuditVO statInfo = new AuditVO(); HashMap<String, AtomicLong> countMap = new HashMap<>(); HashMap<String, AtomicLong> delayMap = new HashMap<>(); + HashMap<String, AtomicLong> sizeMap = new HashMap<>(); statInfo.setAuditId(auditVO.getAuditId()); statInfo.setNodeType(auditVO.getNodeType()); for (AuditInfo auditInfo : auditVO.getAuditSet()) { @@ -472,8 +477,12 @@ public class AuditServiceImpl implements AuditService { if (delayMap.get(statKey) == null) { delayMap.put(statKey, new AtomicLong(0)); } + if (sizeMap.get(statKey) == null) { + sizeMap.put(statKey, new AtomicLong(0)); + } countMap.get(statKey).addAndGet(auditInfo.getCount()); delayMap.get(statKey).addAndGet(auditInfo.getDelay()); + sizeMap.get(statKey).addAndGet(auditInfo.getSize()); } List<AuditInfo> auditInfoList = new LinkedList<>(); @@ -483,6 +492,7 @@ public class AuditServiceImpl implements AuditService { long count = entry.getValue().get(); auditInfoStat.setCount(entry.getValue().get()); auditInfoStat.setDelay(count == 0 ? 0 : delayMap.get(entry.getKey()).get() / count); + auditInfoStat.setSize(count == 0 ? 0 : sizeMap.get(entry.getKey()).get() / count); auditInfoList.add(auditInfoStat); } statInfo.setAuditSet(auditInfoList); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java index 7909b0e012..28b8636969 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java @@ -52,8 +52,8 @@ class AuditServiceTest extends ServiceBaseTest { List<AuditVO> result = new ArrayList<>(); AuditVO auditVO = new AuditVO(); auditVO.setAuditId("3"); - auditVO.setAuditSet(Arrays.asList(new AuditInfo("2022-01-01 00:00:00", 123L, 12L), - new AuditInfo("2022-01-01 00:01:00", 124L, 12L))); + auditVO.setAuditSet(Arrays.asList(new AuditInfo("2022-01-01 00:00:00", 123L, 12L, 12L), + new AuditInfo("2022-01-01 00:01:00", 124L, 12L, 12L))); result.add(auditVO); Assertions.assertNotNull(result); // close real test for testQueryFromMySQL due to date_format function not support in h2