This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 95bd84f5ce [INLONG-9289][Agent] Improve the completion judgment logic of collecting instances (#9290) 95bd84f5ce is described below commit 95bd84f5ce41aa07fe1961c932fca1a8f25c4605 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Nov 15 10:50:17 2023 +0800 [INLONG-9289][Agent] Improve the completion judgment logic of collecting instances (#9290) --- .../inlong/agent/plugin/instance/FileInstance.java | 12 ++++++--- .../agent/plugin/sinks/filecollect/ProxySink.java | 30 ++++++++++++---------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index 5ef904e200..13aef15f9d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -48,9 +48,11 @@ public class FileInstance extends Instance { private InstanceProfile profile; public static final int CORE_THREAD_SLEEP_TIME = 1; private static final int DESTROY_LOOP_WAIT_TIME_MS = 10; + private static final int CHECK_FINISH_AT_LEAST_COUNT = 5; private InstanceManager instanceManager; private volatile boolean running = false; private volatile boolean inited = false; + private volatile int checkFinishCount = 0; @Override public void init(Object srcManager, InstanceProfile srcProfile) { @@ -103,11 +105,15 @@ public class FileInstance extends Instance { Message msg = source.read(); if (msg == null) { if (source.sourceFinish() && sink.sinkFinish()) { - handleReadEnd(); - break; + checkFinishCount++; + if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) { + handleReadEnd(); + break; + } } else { - AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); + checkFinishCount = 0; } + AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); } else { sink.write(msg); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java index be7abce152..922400025e 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java @@ -36,7 +36,6 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; @@ -49,8 +48,8 @@ public class ProxySink extends AbstractSink { private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class); private final int WRITE_FAILED_WAIT_TIME_MS = 10; private final int DESTROY_LOOP_WAIT_TIME_MS = 10; - private final Integer FINISH_READ_MAX_COUNT = 30; - private static AtomicLong index = new AtomicLong(0); + private final Integer NO_WRITE_WAIT_AT_LEAST_MS = 5 * 1000; + private final Integer SINK_FINISH_AT_LEAST_COUNT = 5; private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, @@ -62,18 +61,17 @@ public class ProxySink extends AbstractSink { private volatile boolean shutdown = false; private volatile boolean running = false; private volatile boolean inited = false; - private volatile int readEndCount = 0; + private volatile long lastWriteTime = 0; + private volatile long checkSinkFinishCount = 0; public ProxySink() { } @Override public void write(Message message) { - if (message == null) { - return; - } boolean suc = false; - while (running && !suc) { + while (!shutdown && !suc) { + lastWriteTime = AgentUtils.getCurrentTime(); suc = putInCache(message); if (!suc) { AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); @@ -146,7 +144,7 @@ public class ProxySink extends AbstractSink { try { SenderMessage senderMessage = cache.fetchSenderMessage(); if (senderMessage != null) { - readEndCount = 0; + checkSinkFinishCount = 0; senderManager.sendBatch(senderMessage); if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { lastPrintTime = AgentUtils.getCurrentTime(); @@ -156,8 +154,11 @@ public class ProxySink extends AbstractSink { profile.getInstanceId(), senderMessage.getDataTime()); } + } + if (noWriteLongEnough() && senderManager.sendFinished()) { + checkSinkFinishCount++; } else { - readEndCount++; + checkSinkFinishCount = 0; } } catch (Exception ex) { LOGGER.error("error caught", ex); @@ -211,15 +212,18 @@ public class ProxySink extends AbstractSink { */ @Override public boolean sinkFinish() { - if (finishReadLog() && senderManager.sendFinished()) { + if (noWriteLongEnough() && sinkFinishLongEnough()) { return true; } else { return false; } } - public boolean finishReadLog() { - return readEndCount > FINISH_READ_MAX_COUNT; + public boolean noWriteLongEnough() { + return AgentUtils.getCurrentTime() - lastWriteTime > NO_WRITE_WAIT_AT_LEAST_MS; } + public boolean sinkFinishLongEnough() { + return checkSinkFinishCount > SINK_FINISH_AT_LEAST_COUNT; + } }