This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.10 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit df6966bf815a2672ede47352376453cee6303a7a Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Fri Dec 8 11:05:21 2023 +0800 [INLONG-9440][Manager] Support querying audit information based on ip (#9443) (cherry picked from commit 6e701e46f3ec8de9f31d9be8e07a2c6ac7a88467) --- .../manager/dao/mapper/AuditEntityMapper.java | 16 +++++++ .../main/resources/mappers/AuditEntityMapper.xml | 14 +++++++ .../inlong/manager/pojo/audit/AuditRequest.java | 3 ++ .../service/core/impl/AuditServiceImpl.java | 49 +++++++++++++++++++--- 4 files changed, 76 insertions(+), 6 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java index 5030e0a08a..62330e6025 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditEntityMapper.java @@ -43,4 +43,20 @@ public interface AuditEntityMapper { @Param(value = "sDate") String sDate, @Param(value = "eDate") String eDate, @Param(value = "format") String format); + + /** + * sumByLogTsAndIp + * + * @param ip ip + * @param auditId The auditId of inlong + * @param sDate The start date + * @param eDate The end date + * @param format The format such as '%Y-%m-%d %H:%i:00' + * @return The result of query + */ + List<Map<String, Object>> sumByLogTsAndIp(@Param(value = "ip") String ip, + @Param(value = "auditId") String auditId, + @Param(value = "sDate") String sDate, + @Param(value = "eDate") String eDate, + @Param(value = "format") String format); } \ No newline at end of file diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml index cca03f50ef..e2090204ee 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml @@ -58,4 +58,18 @@ group by log_ts, inlong_group_id, inlong_stream_id order by log_ts </select> + + <select id="sumByLogTsAndIp" resultMap="SumByLogTsResultMap"> + select inlong_group_id, inlong_stream_id, date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts, sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size + from ( + select distinct ip, docker_id, thread_id, sdk_ts, packet_id, log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay` + from apache_inlong_audit.audit_data + where ip = #{ip,jdbcType=VARCHAR} + and audit_id = #{auditId,jdbcType=VARCHAR} + and log_ts >= #{sDate, jdbcType=VARCHAR} + and log_ts < #{eDate, jdbcType=VARCHAR} + ) as sub + group by log_ts, inlong_group_id, inlong_stream_id + order by log_ts + </select> </mapper> \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java index 980b48c333..b3d685f0b3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditRequest.java @@ -40,6 +40,9 @@ public class AuditRequest { @ApiModelProperty(value = "inlong stream id") private String inlongStreamId; + @ApiModelProperty(value = "ip for current node") + private String ip; + @ApiModelProperty(value = "audit id list") private List<String> auditIds; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 24478a3f39..f8ee0f618e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -294,8 +294,11 @@ public class AuditServiceImpl implements AuditService { // Support min agg at now DateTime endDate = DAY_DATE_FORMATTER.parseDateTime(request.getEndDate()); String endDateStr = endDate.plusDays(1).toString(DAY_DATE_FORMATTER); - List<Map<String, Object>> sumList = auditEntityMapper.sumByLogTs( - groupId, streamId, auditId, request.getStartDate(), endDateStr, format); + List<Map<String, Object>> sumList = + StringUtils.isNotBlank(request.getIp()) ? auditEntityMapper.sumByLogTsAndIp(request.getIp(), + auditId, request.getStartDate(), endDateStr, format) + : auditEntityMapper.sumByLogTs(groupId, streamId, auditId, request.getStartDate(), + endDateStr, format); List<AuditInfo> auditSet = sumList.stream().map(s -> { AuditInfo vo = new AuditInfo(); vo.setInlongGroupId((String) s.get("inlongGroupId")); @@ -330,8 +333,10 @@ public class AuditServiceImpl implements AuditService { } } else if (AuditQuerySource.CLICKHOUSE == querySource) { try (Connection connection = config.getCkConnection(); - PreparedStatement statement = getAuditCkStatement(connection, groupId, streamId, auditId, - request.getStartDate(), request.getEndDate()); + PreparedStatement statement = getAuditCkStatement(connection, groupId, streamId, + request.getIp(), auditId, + request.getStartDate(), + request.getEndDate()); ResultSet resultSet = statement.executeQuery()) { List<AuditInfo> auditSet = new ArrayList<>(); @@ -425,11 +430,13 @@ public class AuditServiceImpl implements AuditService { * @param endDate The en datetime of request * @return The clickhouse Statement */ - private PreparedStatement getAuditCkStatement(Connection connection, String groupId, String streamId, + private PreparedStatement getAuditCkStatement(Connection connection, String groupId, String streamId, String ip, String auditId, String startDate, String endDate) throws SQLException { String start = DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT); String end = DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT); - + if (StringUtils.isNotBlank(ip)) { + return getAuditCkStatementByIp(connection, auditId, ip, startDate, endDate); + } // Query results are duplicated according to all fields. String subQuery = new SQL() .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id", @@ -541,4 +548,34 @@ public class AuditServiceImpl implements AuditService { return formatDateString; } + private PreparedStatement getAuditCkStatementByIp(Connection connection, String auditId, String ip, + String startDate, String endDate) throws SQLException { + String start = DAY_DATE_FORMATTER.parseDateTime(startDate).toString(SECOND_FORMAT); + String end = DAY_DATE_FORMATTER.parseDateTime(endDate).plusDays(1).toString(SECOND_FORMAT); + String subQuery = new SQL() + .SELECT_DISTINCT("ip", "docker_id", "thread_id", "sdk_ts", "packet_id", "log_ts", "inlong_group_id", + "inlong_stream_id", "audit_id", "count", "size", "delay") + .FROM("audit_data") + .WHERE("ip = ?") + .WHERE("audit_id = ?") + .WHERE("log_ts >= ?") + .WHERE("log_ts < ?") + .toString(); + + String sql = new SQL() + .SELECT("inlong_group_id", "inlong_stream_id", "log_ts", "sum(count) as total", + "sum(delay) as total_delay", "sum(size) as total_size") + .FROM("(" + subQuery + ") as sub") + .GROUP_BY("log_ts", "inlong_group_id", "inlong_stream_id") + .ORDER_BY("log_ts") + .toString(); + + PreparedStatement statement = connection.prepareStatement(sql); + statement.setString(1, ip); + statement.setString(2, auditId); + statement.setString(3, start); + statement.setString(4, end); + return statement; + } + }