This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit df6966bf815a2672ede47352376453cee6303a7a
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Fri Dec 8 11:05:21 2023 +0800

    [INLONG-9440][Manager] Support querying audit information based on ip 
(#9443)
    
    (cherry picked from commit 6e701e46f3ec8de9f31d9be8e07a2c6ac7a88467)
---
 .../manager/dao/mapper/AuditEntityMapper.java      | 16 +++++++
 .../main/resources/mappers/AuditEntityMapper.xml   | 14 +++++++
 .../inlong/manager/pojo/audit/AuditRequest.java    |  3 ++
 .../service/core/impl/AuditServiceImpl.java        | 49 +++++++++++++++++++---
 4 files changed, 76 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
index 5030e0a08a..62330e6025 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java
@@ -43,4 +43,20 @@ public interface AuditEntityMapper {
             @Param(value = "sDate") String sDate,
             @Param(value = "eDate") String eDate,
             @Param(value = "format") String format);
+
+    /**
+     * sumByLogTsAndIp
+     *
+     * @param ip ip
+     * @param auditId The auditId of inlong
+     * @param sDate The start date
+     * @param eDate The end date
+     * @param format The format such as '%Y-%m-%d %H:%i:00'
+     * @return The result of query
+     */
+    List<Map<String, Object>> sumByLogTsAndIp(@Param(value = "ip") String ip,
+            @Param(value = "auditId") String auditId,
+            @Param(value = "sDate") String sDate,
+            @Param(value = "eDate") String eDate,
+            @Param(value = "format") String format);
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index cca03f50ef..e2090204ee 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -58,4 +58,18 @@
         group by log_ts, inlong_group_id, inlong_stream_id
         order by log_ts
     </select>
+
+    <select id="sumByLogTsAndIp" resultMap="SumByLogTsResultMap">
+        select inlong_group_id, inlong_stream_id, date_format(log_ts, 
#{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as 
total_delay, sum(`size`) as total_size
+        from (
+                 select distinct ip, docker_id, thread_id, sdk_ts, packet_id, 
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
+                 from apache_inlong_audit.audit_data
+                 where ip = #{ip,jdbcType=VARCHAR}
+                   and audit_id = #{auditId,jdbcType=VARCHAR}
+                   and log_ts &gt;= #{sDate, jdbcType=VARCHAR}
+                   and log_ts &lt; #{eDate, jdbcType=VARCHAR}
+             ) as sub
+        group by log_ts, inlong_group_id, inlong_stream_id
+        order by log_ts
+    </select>
 </mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
index 980b48c333..b3d685f0b3 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java
@@ -40,6 +40,9 @@ public class AuditRequest {
     @ApiModelProperty(value = "inlong stream id")
     private String inlongStreamId;
 
+    @ApiModelProperty(value = "ip for current node")
+    private String ip;
+
     @ApiModelProperty(value = "audit id list")
     private List<String> auditIds;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 24478a3f39..f8ee0f618e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -294,8 +294,11 @@ public class AuditServiceImpl implements AuditService {
                 // Support min agg at now
                 DateTime endDate = 
DAY_DATE_FORMATTER.parseDateTime(request.getEndDate());
                 String endDateStr = 
endDate.plusDays(1).toString(DAY_DATE_FORMATTER);
-                List<Map<String, Object>> sumList = 
auditEntityMapper.sumByLogTs(
-                        groupId, streamId, auditId, request.getStartDate(), 
endDateStr, format);
+                List<Map<String, Object>> sumList =
+                        StringUtils.isNotBlank(request.getIp()) ? 
auditEntityMapper.sumByLogTsAndIp(request.getIp(),
+                                auditId, request.getStartDate(), endDateStr, 
format)
+                                : auditEntityMapper.sumByLogTs(groupId, 
streamId, auditId, request.getStartDate(),
+                                        endDateStr, format);
                 List<AuditInfo> auditSet = sumList.stream().map(s -> {
                     AuditInfo vo = new AuditInfo();
                     vo.setInlongGroupId((String) s.get("inlongGroupId"));
@@ -330,8 +333,10 @@ public class AuditServiceImpl implements AuditService {
                 }
             } else if (AuditQuerySource.CLICKHOUSE == querySource) {
                 try (Connection connection = config.getCkConnection();
-                        PreparedStatement statement = 
getAuditCkStatement(connection, groupId, streamId, auditId,
-                                request.getStartDate(), request.getEndDate());
+                        PreparedStatement statement = 
getAuditCkStatement(connection, groupId, streamId,
+                                request.getIp(), auditId,
+                                request.getStartDate(),
+                                request.getEndDate());
 
                         ResultSet resultSet = statement.executeQuery()) {
                     List<AuditInfo> auditSet = new ArrayList<>();
@@ -425,11 +430,13 @@ public class AuditServiceImpl implements AuditService {
      * @param endDate The en datetime of request
      * @return The clickhouse Statement
      */
-    private PreparedStatement getAuditCkStatement(Connection connection, 
String groupId, String streamId,
+    private PreparedStatement getAuditCkStatement(Connection connection, 
String groupId, String streamId, String ip,
             String auditId, String startDate, String endDate) throws 
SQLException {
         String start = 
DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
         String end = 
DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
-
+        if (StringUtils.isNotBlank(ip)) {
+            return getAuditCkStatementByIp(connection, auditId, ip, startDate, 
endDate);
+        }
         // Query results are duplicated according to all fields.
         String subQuery = new SQL()
                 .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", 
"packet_id", "log_ts", "inlong_group_id",
@@ -541,4 +548,34 @@ public class AuditServiceImpl implements AuditService {
         return formatDateString;
     }
 
+    private PreparedStatement getAuditCkStatementByIp(Connection connection, 
String auditId, String ip,
+            String startDate, String endDate) throws SQLException {
+        String start = 
DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT);
+        String end = 
DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT);
+        String subQuery = new SQL()
+                .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", 
"packet_id", "log_ts", "inlong_group_id",
+                        "inlong_stream_id", "audit_id", "count", "size", 
"delay")
+                .FROM("audit_data")
+                .WHERE("ip = ?")
+                .WHERE("audit_id = ?")
+                .WHERE("log_ts >= ?")
+                .WHERE("log_ts < ?")
+                .toString();
+
+        String sql = new SQL()
+                .SELECT("inlong_group_id", "inlong_stream_id", "log_ts", 
"sum(count) as total",
+                        "sum(delay) as total_delay", "sum(size) as total_size")
+                .FROM("(" + subQuery + ") as sub")
+                .GROUP_BY("log_ts", "inlong_group_id", "inlong_stream_id")
+                .ORDER_BY("log_ts")
+                .toString();
+
+        PreparedStatement statement = connection.prepareStatement(sql);
+        statement.setString(1, ip);
+        statement.setString(2, auditId);
+        statement.setString(3, start);
+        statement.setString(4, end);
+        return statement;
+    }
+
 }

Reply via email to