This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c5f52378c [INLONG-11524][Agent] Save offset before exiting to reduce 
data duplication (#11526)
3c5f52378c is described below

commit 3c5f52378ce1ecf9ce7d3abf453c0cf2e612d679
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Nov 21 21:03:54 2024 +0800

    [INLONG-11524][Agent] Save offset before exiting to reduce data duplication 
(#11526)
---
 .../inlong/agent/plugin/sinks/ProxySink.java       | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index b7ddc79a15..2f54ec59de 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -57,8 +57,9 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
 public class ProxySink extends AbstractSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
-    private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+    private final int LOOP_WAIT_TIME_MS = 10;
     public final int SAVE_OFFSET_INTERVAL_MS = 1000;
+    public volatile long lastFlushOffset = AgentUtils.getCurrentTime();
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             1L, TimeUnit.SECONDS,
@@ -193,12 +194,21 @@ public class ProxySink extends AbstractSink {
         if (!inited) {
             return;
         }
+        Long start = AgentUtils.getCurrentTime();
         shutdown = true;
         while (running || offsetRunning) {
-            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
+            AgentUtils.silenceSleepInMs(LOOP_WAIT_TIME_MS);
         }
+        LOGGER.info("destroy proxySink wait run elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
+        start = AgentUtils.getCurrentTime();
         senderManager.Stop();
+        LOGGER.info("destroy proxySink wait sender elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
+        start = AgentUtils.getCurrentTime();
         clearOffset();
+        LOGGER.info("destroy proxySink wait offset elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
         LOGGER.info("destroy sink {} end", sourceName);
     }
 
@@ -234,8 +244,11 @@ public class ProxySink extends AbstractSink {
             LOGGER.info("start flush offset {}:{}", inlongGroupId, sourceName);
             offsetRunning = true;
             while (!shutdown) {
-                doFlushOffset();
-                AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS);
+                if (AgentUtils.getCurrentTime() - lastFlushOffset > 
SAVE_OFFSET_INTERVAL_MS) {
+                    doFlushOffset();
+                    lastFlushOffset = AgentUtils.getCurrentTime();
+                }
+                AgentUtils.silenceSleepInMs(LOOP_WAIT_TIME_MS);
             }
             LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourceName);
             offsetRunning = false;
@@ -269,6 +282,7 @@ public class ProxySink extends AbstractSink {
     }
 
     private void clearOffset() {
+        doFlushOffset();
         packageAckInfoLock.writeLock().lock();
         for (int i = 0; i < ackInfoList.size();) {
             MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
ackInfoList.remove(i).getLen());

Reply via email to