This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 702385afe9 [INLONG-9267][Agent] Fix bug: data loss when there are many files to read once (#9268) 702385afe9 is described below commit 702385afe927ed229dec4a983281789782efa622 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Sun Nov 12 14:56:53 2023 +0800 [INLONG-9267][Agent] Fix bug: data loss when there are many files to read once (#9268) --- .../inlong/agent/plugin/sinks/filecollect/ProxySink.java | 15 ++++++++++++++- .../inlong/agent/plugin/sources/LogFileSource.java | 16 ++++++++++++---- .../inlong/agent/plugin/sources/TestLogFileSource.java | 6 ++++-- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java index 806612e661..be7abce152 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java @@ -49,6 +49,7 @@ public class ProxySink extends AbstractSink { private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class); private final int WRITE_FAILED_WAIT_TIME_MS = 10; private final int DESTROY_LOOP_WAIT_TIME_MS = 10; + private final Integer FINISH_READ_MAX_COUNT = 30; private static AtomicLong index = new AtomicLong(0); private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, @@ -61,6 +62,7 @@ public class ProxySink extends AbstractSink { private volatile boolean shutdown = false; private volatile boolean running = false; private volatile boolean inited = false; + private volatile int readEndCount = 0; public ProxySink() { } @@ -144,6 +146,7 @@ public class ProxySink extends AbstractSink { try { SenderMessage senderMessage = cache.fetchSenderMessage(); if (senderMessage != null) { + readEndCount = 0; senderManager.sendBatch(senderMessage); if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { lastPrintTime = AgentUtils.getCurrentTime(); @@ -153,6 +156,8 @@ public class ProxySink extends AbstractSink { profile.getInstanceId(), senderMessage.getDataTime()); } + } else { + readEndCount++; } } catch (Exception ex) { LOGGER.error("error caught", ex); @@ -206,7 +211,15 @@ public class ProxySink extends AbstractSink { */ @Override public boolean sinkFinish() { - return cache.isEmpty() && senderManager.sendFinished(); + if (finishReadLog() && senderManager.sendFinished()) { + return true; + } else { + return false; + } + } + + public boolean finishReadLog() { + return readEndCount > FINISH_READ_MAX_COUNT; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 7b94e3f068..089f0fee75 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -132,6 +132,7 @@ public class LogFileSource extends AbstractSource { private volatile boolean running = false; public LogFileSource() { + OffsetManager.init(); } @Override @@ -175,7 +176,7 @@ public class LogFileSource extends AbstractSource { } private long getInitLineOffset(boolean isIncrement, String taskId, String instanceId, String inodeInfo) { - OffsetProfile offsetProfile = OffsetManager.init().getOffset(taskId, instanceId); + OffsetProfile offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId); int fileLineCount = getRealLineCount(instanceId); long offset = 0; if (offsetProfile != null && offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) { @@ -334,9 +335,8 @@ public class LogFileSource extends AbstractSource { } if (sourceData == null) { return null; - } else { - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); Message finalMsg = createMessage(sourceData); return finalMsg; } @@ -377,7 +377,7 @@ public class LogFileSource extends AbstractSource { suc = MemoryManager.getInstance().tryAcquire(permitName, permitLen); if (!suc) { MemoryManager.getInstance().printDetail(permitName, "log file source"); - if (!isInodeChanged() || !isRunnable()) { + if (isInodeChanged() || !isRunnable()) { return false; } AgentUtils.silenceSleepInSeconds(1); @@ -530,6 +530,14 @@ public class LogFileSource extends AbstractSource { @Override public boolean sourceFinish() { + if (finishReadLog() && queue.isEmpty()) { + return true; + } else { + return false; + } + } + + public boolean finishReadLog() { return readEndCount > FINISH_READ_MAX_COUNT; } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index eb9c8ccf89..b2a3b605b7 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -101,7 +101,7 @@ public class TestLogFileSource { srcLen += check[i].getBytes(StandardCharsets.UTF_8).length; } LogFileSource source = getSource(); - await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish()); + await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog()); int cnt = 0; int leftBeforeRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT); Assert.assertTrue(leftBeforeRead + srcLen == DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT); @@ -114,6 +114,7 @@ public class TestLogFileSource { msg = source.read(); cnt++; } + await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish()); source.destroy(); Assert.assertTrue(cnt == 3); Assert.assertTrue(srcLen == readLen); @@ -123,10 +124,11 @@ public class TestLogFileSource { private void testCleanQueue() { LogFileSource source = getSource(); - await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish()); + await().atMost(2, TimeUnit.SECONDS).until(() -> source.finishReadLog()); for (int i = 0; i < 2; i++) { source.read(); } + Assert.assertTrue(!source.sourceFinish()); source.destroy(); int leftAfterRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT); Assert.assertTrue(leftAfterRead == DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);