This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 95bd84f5ce [INLONG-9289][Agent] Improve the completion judgment logic 
of collecting instances (#9290)
95bd84f5ce is described below

commit 95bd84f5ce41aa07fe1961c932fca1a8f25c4605
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Nov 15 10:50:17 2023 +0800

    [INLONG-9289][Agent] Improve the completion judgment logic of collecting 
instances (#9290)
---
 .../inlong/agent/plugin/instance/FileInstance.java | 12 ++++++---
 .../agent/plugin/sinks/filecollect/ProxySink.java  | 30 ++++++++++++----------
 2 files changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 5ef904e200..13aef15f9d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -48,9 +48,11 @@ public class FileInstance extends Instance {
     private InstanceProfile profile;
     public static final int CORE_THREAD_SLEEP_TIME = 1;
     private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+    private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
     private InstanceManager instanceManager;
     private volatile boolean running = false;
     private volatile boolean inited = false;
+    private volatile int checkFinishCount = 0;
 
     @Override
     public void init(Object srcManager, InstanceProfile srcProfile) {
@@ -103,11 +105,15 @@ public class FileInstance extends Instance {
             Message msg = source.read();
             if (msg == null) {
                 if (source.sourceFinish() && sink.sinkFinish()) {
-                    handleReadEnd();
-                    break;
+                    checkFinishCount++;
+                    if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
+                        handleReadEnd();
+                        break;
+                    }
                 } else {
-                    AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+                    checkFinishCount = 0;
                 }
+                AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
             } else {
                 sink.write(msg);
             }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index be7abce152..922400025e 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -36,7 +36,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
@@ -49,8 +48,8 @@ public class ProxySink extends AbstractSink {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
     private final int WRITE_FAILED_WAIT_TIME_MS = 10;
     private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
-    private final Integer FINISH_READ_MAX_COUNT = 30;
-    private static AtomicLong index = new AtomicLong(0);
+    private final Integer NO_WRITE_WAIT_AT_LEAST_MS = 5 * 1000;
+    private final Integer SINK_FINISH_AT_LEAST_COUNT = 5;
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             1L, TimeUnit.SECONDS,
@@ -62,18 +61,17 @@ public class ProxySink extends AbstractSink {
     private volatile boolean shutdown = false;
     private volatile boolean running = false;
     private volatile boolean inited = false;
-    private volatile int readEndCount = 0;
+    private volatile long lastWriteTime = 0;
+    private volatile long checkSinkFinishCount = 0;
 
     public ProxySink() {
     }
 
     @Override
     public void write(Message message) {
-        if (message == null) {
-            return;
-        }
         boolean suc = false;
-        while (running && !suc) {
+        while (!shutdown && !suc) {
+            lastWriteTime = AgentUtils.getCurrentTime();
             suc = putInCache(message);
             if (!suc) {
                 AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
@@ -146,7 +144,7 @@ public class ProxySink extends AbstractSink {
                 try {
                     SenderMessage senderMessage = cache.fetchSenderMessage();
                     if (senderMessage != null) {
-                        readEndCount = 0;
+                        checkSinkFinishCount = 0;
                         senderManager.sendBatch(senderMessage);
                         if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
                             lastPrintTime = AgentUtils.getCurrentTime();
@@ -156,8 +154,11 @@ public class ProxySink extends AbstractSink {
                                     profile.getInstanceId(),
                                     senderMessage.getDataTime());
                         }
+                    }
+                    if (noWriteLongEnough() && senderManager.sendFinished()) {
+                        checkSinkFinishCount++;
                     } else {
-                        readEndCount++;
+                        checkSinkFinishCount = 0;
                     }
                 } catch (Exception ex) {
                     LOGGER.error("error caught", ex);
@@ -211,15 +212,18 @@ public class ProxySink extends AbstractSink {
      */
     @Override
     public boolean sinkFinish() {
-        if (finishReadLog() && senderManager.sendFinished()) {
+        if (noWriteLongEnough() && sinkFinishLongEnough()) {
             return true;
         } else {
             return false;
         }
     }
 
-    public boolean finishReadLog() {
-        return readEndCount > FINISH_READ_MAX_COUNT;
+    public boolean noWriteLongEnough() {
+        return AgentUtils.getCurrentTime() - lastWriteTime > 
NO_WRITE_WAIT_AT_LEAST_MS;
     }
 
+    public boolean sinkFinishLongEnough() {
+        return checkSinkFinishCount > SINK_FINISH_AT_LEAST_COUNT;
+    }
 }

Reply via email to