This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new af93ac24b7 [INLONG-9369][Agent] Increase sending failure audit and real-time audit (#9370) af93ac24b7 is described below commit af93ac24b72b90e488233a87e1a2fd62a7f9ad72 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Nov 30 18:10:49 2023 +0800 [INLONG-9369][Agent] Increase sending failure audit and real-time audit (#9370) * [INLONG-9369][Agent] Increase sending failure audit and real-time audit * [INLONG-9369][Agent] Increase sending failure audit and real-time audit --- .../main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java | 4 ++++ .../apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java | 6 ++++++ .../java/org/apache/inlong/agent/plugin/sources/LogFileSource.java | 4 ++++ 3 files changed, 14 insertions(+) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index 68a4e64842..b4eda2ce60 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -42,6 +42,10 @@ public class AuditUtils { public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000; public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3; public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4; + public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 25; + public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 26; + public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004; + public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026; private static boolean IS_AUDIT = true; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 45fe9bc63e..d027672b18 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -346,11 +346,17 @@ public class SenderManager { getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, message.getMsgCnt(), message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId, streamId, + AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize()); } else { LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result); getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(msgCnt); putInResendQueue(new AgentSenderCallback(message, retry)); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId, streamId, + dataTime, message.getMsgCnt(), message.getTotalSize()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, groupId, streamId, + AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize()); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 7c719b1e47..0cc97afb8c 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -319,6 +319,8 @@ public class LogFileSource extends AbstractSource { if (overLen) { LOGGER.warn("readLines over len finally string len {}", new String(baos.toByteArray()).length()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, + inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize); } baos.reset(); overLen = false; @@ -382,6 +384,8 @@ public class LogFileSource extends AbstractSource { } AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), auditTime, 1, msgWithMetaData.length()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), + AgentUtils.getCurrentTime(), 1, msgWithMetaData.length()); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) {