This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e51e9edfc9 [INLONG-9265][Agent] Add audit of agent send success (#9266) e51e9edfc9 is described below commit e51e9edfc913c6c6bc55df111531a1c8254b82ff Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Sun Nov 12 14:57:39 2023 +0800 [INLONG-9265][Agent] Add audit of agent send success (#9266) --- .../apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index abb21fe8ba..d13ef19963 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -28,6 +28,7 @@ import org.apache.inlong.agent.message.filecollect.PackageAckInfo; import org.apache.inlong.agent.message.filecollect.SenderMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.message.SequentialID; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ThreadUtils; @@ -47,7 +48,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -122,7 +122,6 @@ public class SenderManager { private List<PackageAckInfo> packageAckInfoList = new ArrayList<>(); private final ReentrantReadWriteLock packageAckInfoLock = new ReentrantReadWriteLock(true); protected InstanceProfile profile; - private Random testRandom = new Random(); private volatile boolean offsetRunning = false; private volatile boolean resendRunning = false; private volatile boolean started = false; @@ -175,7 +174,6 @@ public class SenderManager { this.metricItemSet = new AgentMetricItemSet(metricName); MetricRegister.register(metricItemSet); resendQueue = new LinkedBlockingQueue<>(); - } public void Start() throws Exception { @@ -429,6 +427,8 @@ public class SenderManager { if (result != null && result.equals(SendResult.OK)) { message.getAckInfo().setHasAck(true); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, + System.currentTimeMillis(), message.getMsgCnt(), message.getTotalSize()); } else { LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result);