This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3c5f52378c [INLONG-11524][Agent] Save offset before exiting to reduce data duplication (#11526) 3c5f52378c is described below commit 3c5f52378ce1ecf9ce7d3abf453c0cf2e612d679 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Nov 21 21:03:54 2024 +0800 [INLONG-11524][Agent] Save offset before exiting to reduce data duplication (#11526) --- .../inlong/agent/plugin/sinks/ProxySink.java | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index b7ddc79a15..2f54ec59de 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -57,8 +57,9 @@ import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO; public class ProxySink extends AbstractSink { private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class); - private final int DESTROY_LOOP_WAIT_TIME_MS = 10; + private final int LOOP_WAIT_TIME_MS = 10; public final int SAVE_OFFSET_INTERVAL_MS = 1000; + public volatile long lastFlushOffset = AgentUtils.getCurrentTime(); private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, @@ -193,12 +194,21 @@ public class ProxySink extends AbstractSink { if (!inited) { return; } + Long start = AgentUtils.getCurrentTime(); shutdown = true; while (running || offsetRunning) { - AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); + AgentUtils.silenceSleepInMs(LOOP_WAIT_TIME_MS); } + LOGGER.info("destroy proxySink wait run elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + start = AgentUtils.getCurrentTime(); senderManager.Stop(); + LOGGER.info("destroy proxySink wait sender elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + start = AgentUtils.getCurrentTime(); clearOffset(); + LOGGER.info("destroy proxySink wait offset elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); LOGGER.info("destroy sink {} end", sourceName); } @@ -234,8 +244,11 @@ public class ProxySink extends AbstractSink { LOGGER.info("start flush offset {}:{}", inlongGroupId, sourceName); offsetRunning = true; while (!shutdown) { - doFlushOffset(); - AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS); + if (AgentUtils.getCurrentTime() - lastFlushOffset > SAVE_OFFSET_INTERVAL_MS) { + doFlushOffset(); + lastFlushOffset = AgentUtils.getCurrentTime(); + } + AgentUtils.silenceSleepInMs(LOOP_WAIT_TIME_MS); } LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourceName); offsetRunning = false; @@ -269,6 +282,7 @@ public class ProxySink extends AbstractSink { } private void clearOffset() { + doFlushOffset(); packageAckInfoLock.writeLock().lock(); for (int i = 0; i < ackInfoList.size();) { MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, ackInfoList.remove(i).getLen());