This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.10 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 26a3159a00f0959c007336bb892100f3935bf943 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Dec 7 13:05:39 2023 +0800 [INLONG-9435][Manager] Support querying audit data by audit ID and obtaining audit ID information (#9436) (cherry picked from commit e34bf1fde63eb0ad340f03847476d95ba4e2c1da) --- .../main/resources/mappers/AuditEntityMapper.xml | 6 +- .../audit/{AuditVO.java => AuditBaseResponse.java} | 32 +++++------ .../inlong/manager/pojo/audit/AuditInfo.java | 9 +++ .../inlong/manager/pojo/audit/AuditRequest.java | 10 +--- .../apache/inlong/manager/pojo/audit/AuditVO.java | 5 +- .../inlong/manager/service/core/AuditService.java | 3 + .../service/core/impl/AuditServiceImpl.java | 65 +++++++++++++++------- .../manager/web/controller/AuditController.java | 7 +++ 8 files changed, 90 insertions(+), 47 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml index dd582d0d7d..cca03f50ef 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml @@ -36,6 +36,8 @@ </resultMap> <resultMap id="SumByLogTsResultMap" type="java.util.Map"> + <result column="inlong_group_id" property="inlongGroupId" jdbcType="VARCHAR"/> + <result column="inlong_stream_id" property="inlongStreamId" jdbcType="VARCHAR"/> <result column="log_ts" property="logTs" jdbcType="VARCHAR"/> <result column="total" property="total" jdbcType="BIGINT"/> <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/> @@ -43,7 +45,7 @@ </resultMap> <select id="sumByLogTs" resultMap="SumByLogTsResultMap"> - select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size + select inlong_group_id, inlong_stream_id, date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size from ( select distinct ip, docker_id, thread_id, sdk_ts, packet_id, log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay` from apache_inlong_audit.audit_data @@ -53,7 +55,7 @@ and log_ts >= #{sDate, jdbcType=VARCHAR} and log_ts < #{eDate, jdbcType=VARCHAR} ) as sub - group by log_ts + group by log_ts, inlong_group_id, inlong_stream_id order by log_ts </select> </mapper> \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java similarity index 69% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java index 68b8299c58..74da6aa46a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java @@ -20,27 +20,25 @@ package org.apache.inlong.manager.pojo.audit; import io.swagger.annotations.ApiModelProperty; import lombok.Data; -import java.util.List; - /** - * The VO of audit. + * Audit base info */ @Data -public class AuditVO { +public class AuditBaseResponse { + + @ApiModelProperty(value = "Audit log timestamp") + private Integer id; + + @ApiModelProperty(value = "Audit name") + private String name; + + @ApiModelProperty(value = "Audit type") + private String type; + + @ApiModelProperty(value = "is sent") + private Integer isSent; @ApiModelProperty(value = "Audit id") private String auditId; - @ApiModelProperty(value = "Audit set") - private List<AuditInfo> auditSet; - @ApiModelProperty(value = "Node type") - private String nodeType; - - public AuditVO() { - } - - public AuditVO(String auditId, List<AuditInfo> auditSet, String nodeType) { - this.auditId = auditId; - this.auditSet = auditSet; - this.nodeType = nodeType; - } + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java index a1ac46f9e6..8b15edb3ac 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java @@ -26,12 +26,21 @@ import lombok.Data; @Data public class AuditInfo { + @ApiModelProperty(value = "inlong group id") + private String inlongGroupId; + + @ApiModelProperty(value = "inlong stream id") + private String inlongStreamId; + @ApiModelProperty(value = "Audit log timestamp") private String logTs; + @ApiModelProperty(value = "Audit count") private long count; + @ApiModelProperty(value = "Audit delay") private long delay; + @ApiModelProperty(value = "Audit size") private long size; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java index 4023d40dc0..980b48c333 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java @@ -24,8 +24,6 @@ import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; -import javax.validation.constraints.NotBlank; - import java.util.List; /** @@ -36,15 +34,13 @@ import java.util.List; @ApiModel("Audit query request") public class AuditRequest { - @NotBlank(message = "inlongGroupId not be blank") - @ApiModelProperty(value = "inlong group id", required = true) + @ApiModelProperty(value = "inlong group id") private String inlongGroupId; - @NotBlank(message = "inlongStreamId not be blank") - @ApiModelProperty(value = "inlong stream id", required = true) + @ApiModelProperty(value = "inlong stream id") private String inlongStreamId; - @ApiModelProperty(value = "audit id list", required = true) + @ApiModelProperty(value = "audit id list") private List<String> auditIds; @ApiModelProperty(value = "sink id") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java index 68b8299c58..fa58ac17ed 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java @@ -30,6 +30,8 @@ public class AuditVO { @ApiModelProperty(value = "Audit id") private String auditId; + @ApiModelProperty(value = "Audit name") + private String auditName; @ApiModelProperty(value = "Audit set") private List<AuditInfo> auditSet; @ApiModelProperty(value = "Node type") @@ -38,8 +40,9 @@ public class AuditVO { public AuditVO() { } - public AuditVO(String auditId, List<AuditInfo> auditSet, String nodeType) { + public AuditVO(String auditId, String auditName, List<AuditInfo> auditSet, String nodeType) { this.auditId = auditId; + this.auditName = auditName; this.auditSet = auditSet; this.nodeType = nodeType; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java index 5068883b73..e37277aefb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.core; +import org.apache.inlong.manager.pojo.audit.AuditBaseResponse; import org.apache.inlong.manager.pojo.audit.AuditRequest; import org.apache.inlong.manager.pojo.audit.AuditSourceRequest; import org.apache.inlong.manager.pojo.audit.AuditSourceResponse; @@ -37,6 +38,8 @@ public interface AuditService { */ List<AuditVO> listByCondition(AuditRequest request) throws Exception; + List<AuditBaseResponse> getAuditBases(); + /** * Get audit id by type and isSent. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 7ffdafb3b9..24478a3f39 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; +import org.apache.inlong.manager.pojo.audit.AuditBaseResponse; import org.apache.inlong.manager.pojo.audit.AuditInfo; import org.apache.inlong.manager.pojo.audit.AuditRequest; import org.apache.inlong.manager.pojo.audit.AuditSourceRequest; @@ -113,6 +114,8 @@ public class AuditServiceImpl implements AuditService { private final Map<String, AuditBaseEntity> auditReceivedItemMap = new ConcurrentHashMap<>(); + private final Map<String, AuditBaseEntity> auditItemMap = new ConcurrentHashMap<>(); + // defaults to return all audit ids, can be overwritten in properties file // see audit id definitions: https://inlong.apache.org/docs/modules/audit/overview#audit-id @Value("#{'${audit.admin.ids:3,4,5,6}'.split(',')}") @@ -156,6 +159,7 @@ public class AuditServiceImpl implements AuditService { try { List<AuditBaseEntity> auditBaseEntities = auditBaseMapper.selectAll(); for (AuditBaseEntity auditBaseEntity : auditBaseEntities) { + auditItemMap.put(auditBaseEntity.getAuditId(), auditBaseEntity); String type = auditBaseEntity.getType(); if (auditBaseEntity.getIsSent() == 1) { auditSentItemMap.put(type, auditBaseEntity); @@ -252,30 +256,39 @@ public class AuditServiceImpl implements AuditService { if (sinkEntity != null) { sinkNodeType = sinkEntity.getSinkType(); } + Map<String, String> auditIdMap = new HashMap<>(); - InlongGroupEntity groupEntity = inlongGroupMapper.selectByGroupId(groupId); - List<StreamSourceEntity> sourceEntityList = sourceEntityMapper.selectByRelatedId(groupId, streamId, null); - if (CollectionUtils.isNotEmpty(sourceEntityList)) { - sourceNodeType = sourceEntityList.get(0).getSourceType(); - } + if (StringUtils.isNotBlank(groupId)) { + InlongGroupEntity groupEntity = inlongGroupMapper.selectByGroupId(groupId); + List<StreamSourceEntity> sourceEntityList = sourceEntityMapper.selectByRelatedId(groupId, streamId, null); + if (CollectionUtils.isNotEmpty(sourceEntityList)) { + sourceNodeType = sourceEntityList.get(0).getSourceType(); + } - Map<String, String> auditIdMap = new HashMap<>(); - auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType); + auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType); - if (CollectionUtils.isEmpty(request.getAuditIds())) { - // properly overwrite audit ids by role and stream config - if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) { - auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); - request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); - } else { - auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType); - request.setAuditIds(getAuditIds(groupId, streamId, null, sinkNodeType)); + if (CollectionUtils.isEmpty(request.getAuditIds())) { + // properly overwrite audit ids by role and stream config + if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) { + auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); + request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); + } else { + auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType); + request.setAuditIds(getAuditIds(groupId, streamId, null, sinkNodeType)); + } } + } else if (CollectionUtils.isEmpty(request.getAuditIds())) { + throw new BusinessException("audits id is empty"); } List<AuditVO> result = new ArrayList<>(); AuditQuerySource querySource = AuditQuerySource.valueOf(auditQuerySource); for (String auditId : request.getAuditIds()) { + AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId); + String auditName = ""; + if (auditBaseEntity != null) { + auditName = auditBaseEntity.getName(); + } if (AuditQuerySource.MYSQL == querySource) { String format = "%Y-%m-%d %H:%i:00"; // Support min agg at now @@ -285,13 +298,15 @@ public class AuditServiceImpl implements AuditService { groupId, streamId, auditId, request.getStartDate(), endDateStr, format); List<AuditInfo> auditSet = sumList.stream().map(s -> { AuditInfo vo = new AuditInfo(); + vo.setInlongGroupId((String) s.get("inlongGroupId")); + vo.setInlongStreamId((String) s.get("inlongStreamId")); vo.setLogTs((String) s.get("logTs")); vo.setCount(((BigDecimal) s.get("total")).longValue()); vo.setDelay(((BigDecimal) s.get("totalDelay")).longValue()); vo.setSize(((BigDecimal) s.get("totalSize")).longValue()); return vo; }).collect(Collectors.toList()); - result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); + result.add(new AuditVO(auditId, auditName, auditSet, auditIdMap.getOrDefault(auditId, null))); } else if (AuditQuerySource.ELASTICSEARCH == querySource) { String index = String.format("%s_%s", request.getStartDate().replaceAll("-", ""), auditId); if (!elasticsearchApi.indexExists(index)) { @@ -310,7 +325,7 @@ public class AuditServiceImpl implements AuditService { vo.setDelay((long) ((ParsedSum) bucket.getAggregations().asList().get(1)).getValue()); return vo; }).collect(Collectors.toList()); - result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); + result.add(new AuditVO(auditId, auditName, auditSet, auditIdMap.getOrDefault(auditId, null))); } } } else if (AuditQuerySource.CLICKHOUSE == querySource) { @@ -322,13 +337,15 @@ public class AuditServiceImpl implements AuditService { List<AuditInfo> auditSet = new ArrayList<>(); while (resultSet.next()) { AuditInfo vo = new AuditInfo(); + vo.setInlongGroupId(resultSet.getString("inlong_group_id")); + vo.setInlongStreamId(resultSet.getString("inlong_stream_id")); vo.setLogTs(resultSet.getString("log_ts")); vo.setCount(resultSet.getLong("total")); vo.setDelay(resultSet.getLong("total_delay")); vo.setSize(resultSet.getLong("total_size")); auditSet.add(vo); } - result.add(new AuditVO(auditId, auditSet, auditIdMap.getOrDefault(auditId, null))); + result.add(new AuditVO(auditId, auditName, auditSet, auditIdMap.getOrDefault(auditId, null))); } } } @@ -336,6 +353,12 @@ public class AuditServiceImpl implements AuditService { return aggregateByTimeDim(result, request.getTimeStaticsDim()); } + @Override + public List<AuditBaseResponse> getAuditBases() { + List<AuditBaseEntity> auditBaseEntityList = auditBaseMapper.selectAll(); + return CommonBeanUtils.copyListProperties(auditBaseEntityList, AuditBaseResponse::new); + } + private List<String> getAuditIds(String groupId, String streamId, String sourceNodeType, String sinkNodeType) { Set<String> auditSet = LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN) ? new HashSet<>(auditIdListForAdmin) @@ -420,9 +443,10 @@ public class AuditServiceImpl implements AuditService { .toString(); String sql = new SQL() - .SELECT("log_ts", "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size") + .SELECT("inlong_group_id", "inlong_stream_id", "log_ts", "sum(count) as total", + "sum(delay) as total_delay", "sum(size) as total_size") .FROM("(" + subQuery + ") as sub") - .GROUP_BY("log_ts") + .GROUP_BY("log_ts", "inlong_group_id", "inlong_stream_id") .ORDER_BY("log_ts") .toString(); @@ -465,6 +489,7 @@ public class AuditServiceImpl implements AuditService { HashMap<String, AtomicLong> delayMap = new HashMap<>(); HashMap<String, AtomicLong> sizeMap = new HashMap<>(); statInfo.setAuditId(auditVO.getAuditId()); + statInfo.setAuditName(auditVO.getAuditName()); statInfo.setNodeType(auditVO.getNodeType()); for (AuditInfo auditInfo : auditVO.getAuditSet()) { String statKey = formatLogTime(auditInfo.getLogTs(), format); diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java index a7128085e6..9c8beb0a4b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.web.controller; +import org.apache.inlong.manager.pojo.audit.AuditBaseResponse; import org.apache.inlong.manager.pojo.audit.AuditRequest; import org.apache.inlong.manager.pojo.audit.AuditSourceRequest; import org.apache.inlong.manager.pojo.audit.AuditSourceResponse; @@ -69,6 +70,12 @@ public class AuditController { return Response.success(auditService.updateAuditSource(request, LoginUserUtils.getLoginUser().getName())); } + @ApiOperation(value = "Get the audit base info") + @GetMapping("/audit/getAuditBases") + public Response<List<AuditBaseResponse>> getAuditBases() { + return Response.success(auditService.getAuditBases()); + } + @ApiOperation(value = "Get the audit source") @GetMapping("/audit/getSource") public Response<AuditSourceResponse> getAuditSource() {