This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 57bafd0d416bfdee3cf76a2e134399954ce5c016
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Dec 7 13:05:39 2023 +0800

    [INLONG-9435][Manager] Support querying audit data by audit ID and 
obtaining audit ID information (#9436)
    
    (cherry picked from commit e34bf1fde63eb0ad340f03847476d95ba4e2c1da)
---
 .../main/resources/mappers/AuditEntityMapper.xml   |  6 +-
 .../audit/{AuditVO.java => AuditBaseResponse.java} | 32 +++++------
 .../inlong/manager/pojo/audit/AuditInfo.java       |  9 +++
 .../inlong/manager/pojo/audit/AuditRequest.java    | 10 +---
 .../apache/inlong/manager/pojo/audit/AuditVO.java  |  5 +-
 .../inlong/manager/service/core/AuditService.java  |  3 +
 .../service/core/impl/AuditServiceImpl.java        | 65 +++++++++++++++-------
 .../manager/web/controller/AuditController.java    |  7 +++
 8 files changed, 90 insertions(+), 47 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index dd582d0d7d..cca03f50ef 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -36,6 +36,8 @@
     </resultMap>
 
     <resultMap id="SumByLogTsResultMap" type="java.util.Map">
+        <result column="inlong_group_id" property="inlongGroupId" 
jdbcType="VARCHAR"/>
+        <result column="inlong_stream_id" property="inlongStreamId" 
jdbcType="VARCHAR"/>
         <result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
         <result column="total" property="total" jdbcType="BIGINT"/>
         <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
@@ -43,7 +45,7 @@
     </resultMap>
 
     <select id="sumByLogTs" resultMap="SumByLogTsResultMap">
-        select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, 
sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size
+        select inlong_group_id, inlong_stream_id, date_format(log_ts, 
#{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as 
total_delay, sum(`size`) as total_size
         from (
             select distinct ip, docker_id, thread_id, sdk_ts, packet_id, 
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
             from apache_inlong_audit.audit_data
@@ -53,7 +55,7 @@
               and log_ts &gt;= #{sDate, jdbcType=VARCHAR}
               and log_ts &lt; #{eDate, jdbcType=VARCHAR}
             ) as sub
-        group by log_ts
+        group by log_ts, inlong_group_id, inlong_stream_id
         order by log_ts
     </select>
 </mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java
similarity index 69%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java
index 68b8299c58..74da6aa46a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java
@@ -20,27 +20,25 @@ package org.apache.inlong.manager.pojo.audit;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
-import java.util.List;
-
 /**
- * The VO of audit.
+ * Audit base info
  */
 @Data
-public class AuditVO {
+public class AuditBaseResponse {
+
+    @ApiModelProperty(value = "Audit log timestamp")
+    private Integer id;
+
+    @ApiModelProperty(value = "Audit name")
+    private String name;
+
+    @ApiModelProperty(value = "Audit type")
+    private String type;
+
+    @ApiModelProperty(value = "is sent")
+    private Integer isSent;
 
     @ApiModelProperty(value = "Audit id")
     private String auditId;
-    @ApiModelProperty(value = "Audit set")
-    private List<AuditInfo> auditSet;
-    @ApiModelProperty(value = "Node type")
-    private String nodeType;
-
-    public AuditVO() {
-    }
-
-    public AuditVO(String auditId, List<AuditInfo> auditSet, String nodeType) {
-        this.auditId = auditId;
-        this.auditSet = auditSet;
-        this.nodeType = nodeType;
-    }
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
index a1ac46f9e6..8b15edb3ac 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
@@ -26,12 +26,21 @@ import lombok.Data;
 @Data
 public class AuditInfo {
 
+    @ApiModelProperty(value = "inlong group id")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "inlong stream id")
+    private String inlongStreamId;
+
     @ApiModelProperty(value = "Audit log timestamp")
     private String logTs;
+
     @ApiModelProperty(value = "Audit count")
     private long count;
+
     @ApiModelProperty(value = "Audit delay")
     private long delay;
+
     @ApiModelProperty(value = "Audit size")
     private long size;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
index 4023d40dc0..980b48c333 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
@@ -24,8 +24,6 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
-import javax.validation.constraints.NotBlank;
-
 import java.util.List;
 
 /**
@@ -36,15 +34,13 @@ import java.util.List;
 @ApiModel("Audit query request")
 public class AuditRequest {
 
-    @NotBlank(message = "inlongGroupId not be blank")
-    @ApiModelProperty(value = "inlong group id", required = true)
+    @ApiModelProperty(value = "inlong group id")
     private String inlongGroupId;
 
-    @NotBlank(message = "inlongStreamId not be blank")
-    @ApiModelProperty(value = "inlong stream id", required = true)
+    @ApiModelProperty(value = "inlong stream id")
     private String inlongStreamId;
 
-    @ApiModelProperty(value = "audit id list", required = true)
+    @ApiModelProperty(value = "audit id list")
     private List<String> auditIds;
 
     @ApiModelProperty(value = "sink id")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
index 68b8299c58..fa58ac17ed 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditVO.java
@@ -30,6 +30,8 @@ public class AuditVO {
 
     @ApiModelProperty(value = "Audit id")
     private String auditId;
+    @ApiModelProperty(value = "Audit name")
+    private String auditName;
     @ApiModelProperty(value = "Audit set")
     private List<AuditInfo> auditSet;
     @ApiModelProperty(value = "Node type")
@@ -38,8 +40,9 @@ public class AuditVO {
     public AuditVO() {
     }
 
-    public AuditVO(String auditId, List<AuditInfo> auditSet, String nodeType) {
+    public AuditVO(String auditId, String auditName, List<AuditInfo> auditSet, 
String nodeType) {
         this.auditId = auditId;
+        this.auditName = auditName;
         this.auditSet = auditSet;
         this.nodeType = nodeType;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
index 5068883b73..e37277aefb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.core;
 
+import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
 import org.apache.inlong.manager.pojo.audit.AuditRequest;
 import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
 import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
@@ -37,6 +38,8 @@ public interface AuditService {
      */
     List<AuditVO> listByCondition(AuditRequest request) throws Exception;
 
+    List<AuditBaseResponse> getAuditBases();
+
     /**
      * Get audit id by type and isSent.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 7ffdafb3b9..24478a3f39 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
 import org.apache.inlong.manager.pojo.audit.AuditInfo;
 import org.apache.inlong.manager.pojo.audit.AuditRequest;
 import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
@@ -113,6 +114,8 @@ public class AuditServiceImpl implements AuditService {
 
     private final Map<String, AuditBaseEntity> auditReceivedItemMap = new 
ConcurrentHashMap<>();
 
+    private final Map<String, AuditBaseEntity> auditItemMap = new 
ConcurrentHashMap<>();
+
     // defaults to return all audit ids, can be overwritten in properties file
     // see audit id definitions: 
https://inlong.apache.org/docs/modules/audit/overview#audit-id
     @Value("#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
@@ -156,6 +159,7 @@ public class AuditServiceImpl implements AuditService {
         try {
             List<AuditBaseEntity> auditBaseEntities = 
auditBaseMapper.selectAll();
             for (AuditBaseEntity auditBaseEntity : auditBaseEntities) {
+                auditItemMap.put(auditBaseEntity.getAuditId(), 
auditBaseEntity);
                 String type = auditBaseEntity.getType();
                 if (auditBaseEntity.getIsSent() == 1) {
                     auditSentItemMap.put(type, auditBaseEntity);
@@ -252,30 +256,39 @@ public class AuditServiceImpl implements AuditService {
         if (sinkEntity != null) {
             sinkNodeType = sinkEntity.getSinkType();
         }
+        Map<String, String> auditIdMap = new HashMap<>();
 
-        InlongGroupEntity groupEntity = 
inlongGroupMapper.selectByGroupId(groupId);
-        List<StreamSourceEntity> sourceEntityList = 
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
-        if (CollectionUtils.isNotEmpty(sourceEntityList)) {
-            sourceNodeType = sourceEntityList.get(0).getSourceType();
-        }
+        if (StringUtils.isNotBlank(groupId)) {
+            InlongGroupEntity groupEntity = 
inlongGroupMapper.selectByGroupId(groupId);
+            List<StreamSourceEntity> sourceEntityList = 
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
+            if (CollectionUtils.isNotEmpty(sourceEntityList)) {
+                sourceNodeType = sourceEntityList.get(0).getSourceType();
+            }
 
-        Map<String, String> auditIdMap = new HashMap<>();
-        auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
+            auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
 
-        if (CollectionUtils.isEmpty(request.getAuditIds())) {
-            // properly overwrite audit ids by role and stream config
-            if 
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
-                auditIdMap.put(getAuditId(sourceNodeType, false), 
sourceNodeType);
-                request.setAuditIds(getAuditIds(groupId, streamId, 
sourceNodeType, sinkNodeType));
-            } else {
-                auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType);
-                request.setAuditIds(getAuditIds(groupId, streamId, null, 
sinkNodeType));
+            if (CollectionUtils.isEmpty(request.getAuditIds())) {
+                // properly overwrite audit ids by role and stream config
+                if 
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
+                    auditIdMap.put(getAuditId(sourceNodeType, false), 
sourceNodeType);
+                    request.setAuditIds(getAuditIds(groupId, streamId, 
sourceNodeType, sinkNodeType));
+                } else {
+                    auditIdMap.put(getAuditId(sinkNodeType, false), 
sinkNodeType);
+                    request.setAuditIds(getAuditIds(groupId, streamId, null, 
sinkNodeType));
+                }
             }
+        } else if (CollectionUtils.isEmpty(request.getAuditIds())) {
+            throw new BusinessException("audits id is empty");
         }
 
         List<AuditVO> result = new ArrayList<>();
         AuditQuerySource querySource = 
AuditQuerySource.valueOf(auditQuerySource);
         for (String auditId : request.getAuditIds()) {
+            AuditBaseEntity auditBaseEntity = auditItemMap.get(auditId);
+            String auditName = "";
+            if (auditBaseEntity != null) {
+                auditName = auditBaseEntity.getName();
+            }
             if (AuditQuerySource.MYSQL == querySource) {
                 String format = "%Y-%m-%d %H:%i:00";
                 // Support min agg at now
@@ -285,13 +298,15 @@ public class AuditServiceImpl implements AuditService {
                         groupId, streamId, auditId, request.getStartDate(), 
endDateStr, format);
                 List<AuditInfo> auditSet = sumList.stream().map(s -> {
                     AuditInfo vo = new AuditInfo();
+                    vo.setInlongGroupId((String) s.get("inlongGroupId"));
+                    vo.setInlongStreamId((String) s.get("inlongStreamId"));
                     vo.setLogTs((String) s.get("logTs"));
                     vo.setCount(((BigDecimal) s.get("total")).longValue());
                     vo.setDelay(((BigDecimal) 
s.get("totalDelay")).longValue());
                     vo.setSize(((BigDecimal) s.get("totalSize")).longValue());
                     return vo;
                 }).collect(Collectors.toList());
-                result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
+                result.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
             } else if (AuditQuerySource.ELASTICSEARCH == querySource) {
                 String index = String.format("%s_%s", 
request.getStartDate().replaceAll("-", ""), auditId);
                 if (!elasticsearchApi.indexExists(index)) {
@@ -310,7 +325,7 @@ public class AuditServiceImpl implements AuditService {
                             vo.setDelay((long) ((ParsedSum) 
bucket.getAggregations().asList().get(1)).getValue());
                             return vo;
                         }).collect(Collectors.toList());
-                        result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
+                        result.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
                     }
                 }
             } else if (AuditQuerySource.CLICKHOUSE == querySource) {
@@ -322,13 +337,15 @@ public class AuditServiceImpl implements AuditService {
                     List<AuditInfo> auditSet = new ArrayList<>();
                     while (resultSet.next()) {
                         AuditInfo vo = new AuditInfo();
+                        
vo.setInlongGroupId(resultSet.getString("inlong_group_id"));
+                        
vo.setInlongStreamId(resultSet.getString("inlong_stream_id"));
                         vo.setLogTs(resultSet.getString("log_ts"));
                         vo.setCount(resultSet.getLong("total"));
                         vo.setDelay(resultSet.getLong("total_delay"));
                         vo.setSize(resultSet.getLong("total_size"));
                         auditSet.add(vo);
                     }
-                    result.add(new AuditVO(auditId, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
+                    result.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
                 }
             }
         }
@@ -336,6 +353,12 @@ public class AuditServiceImpl implements AuditService {
         return aggregateByTimeDim(result, request.getTimeStaticsDim());
     }
 
+    @Override
+    public List<AuditBaseResponse> getAuditBases() {
+        List<AuditBaseEntity> auditBaseEntityList = 
auditBaseMapper.selectAll();
+        return CommonBeanUtils.copyListProperties(auditBaseEntityList, 
AuditBaseResponse::new);
+    }
+
     private List<String> getAuditIds(String groupId, String streamId, String 
sourceNodeType, String sinkNodeType) {
         Set<String> auditSet = 
LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN)
                 ? new HashSet<>(auditIdListForAdmin)
@@ -420,9 +443,10 @@ public class AuditServiceImpl implements AuditService {
                 .toString();
 
         String sql = new SQL()
-                .SELECT("log_ts", "sum(count) as total", "sum(delay) as 
total_delay", "sum(size) as total_size")
+                .SELECT("inlong_group_id", "inlong_stream_id", "log_ts", 
"sum(count) as total",
+                        "sum(delay) as total_delay", "sum(size) as total_size")
                 .FROM("(" + subQuery + ") as sub")
-                .GROUP_BY("log_ts")
+                .GROUP_BY("log_ts", "inlong_group_id", "inlong_stream_id")
                 .ORDER_BY("log_ts")
                 .toString();
 
@@ -465,6 +489,7 @@ public class AuditServiceImpl implements AuditService {
             HashMap<String, AtomicLong> delayMap = new HashMap<>();
             HashMap<String, AtomicLong> sizeMap = new HashMap<>();
             statInfo.setAuditId(auditVO.getAuditId());
+            statInfo.setAuditName(auditVO.getAuditName());
             statInfo.setNodeType(auditVO.getNodeType());
             for (AuditInfo auditInfo : auditVO.getAuditSet()) {
                 String statKey = formatLogTime(auditInfo.getLogTs(), format);
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
index a7128085e6..9c8beb0a4b 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.web.controller;
 
+import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
 import org.apache.inlong.manager.pojo.audit.AuditRequest;
 import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
 import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
@@ -69,6 +70,12 @@ public class AuditController {
         return Response.success(auditService.updateAuditSource(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
+    @ApiOperation(value = "Get the audit base info")
+    @GetMapping("/audit/getAuditBases")
+    public Response<List<AuditBaseResponse>> getAuditBases() {
+        return Response.success(auditService.getAuditBases());
+    }
+
     @ApiOperation(value = "Get the audit source")
     @GetMapping("/audit/getSource")
     public Response<AuditSourceResponse> getAuditSource() {

Reply via email to