This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 702385afe9 [INLONG-9267][Agent] Fix bug: data loss when there are many 
files to read once (#9268)
702385afe9 is described below

commit 702385afe927ed229dec4a983281789782efa622
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Sun Nov 12 14:56:53 2023 +0800

    [INLONG-9267][Agent] Fix bug: data loss when there are many files to read 
once (#9268)
---
 .../inlong/agent/plugin/sinks/filecollect/ProxySink.java | 15 ++++++++++++++-
 .../inlong/agent/plugin/sources/LogFileSource.java       | 16 ++++++++++++----
 .../inlong/agent/plugin/sources/TestLogFileSource.java   |  6 ++++--
 3 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 806612e661..be7abce152 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -49,6 +49,7 @@ public class ProxySink extends AbstractSink {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
     private final int WRITE_FAILED_WAIT_TIME_MS = 10;
     private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+    private final Integer FINISH_READ_MAX_COUNT = 30;
     private static AtomicLong index = new AtomicLong(0);
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
@@ -61,6 +62,7 @@ public class ProxySink extends AbstractSink {
     private volatile boolean shutdown = false;
     private volatile boolean running = false;
     private volatile boolean inited = false;
+    private volatile int readEndCount = 0;
 
     public ProxySink() {
     }
@@ -144,6 +146,7 @@ public class ProxySink extends AbstractSink {
                 try {
                     SenderMessage senderMessage = cache.fetchSenderMessage();
                     if (senderMessage != null) {
+                        readEndCount = 0;
                         senderManager.sendBatch(senderMessage);
                         if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
                             lastPrintTime = AgentUtils.getCurrentTime();
@@ -153,6 +156,8 @@ public class ProxySink extends AbstractSink {
                                     profile.getInstanceId(),
                                     senderMessage.getDataTime());
                         }
+                    } else {
+                        readEndCount++;
                     }
                 } catch (Exception ex) {
                     LOGGER.error("error caught", ex);
@@ -206,7 +211,15 @@ public class ProxySink extends AbstractSink {
      */
     @Override
     public boolean sinkFinish() {
-        return cache.isEmpty() && senderManager.sendFinished();
+        if (finishReadLog() && senderManager.sendFinished()) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public boolean finishReadLog() {
+        return readEndCount > FINISH_READ_MAX_COUNT;
     }
 
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 7b94e3f068..089f0fee75 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -132,6 +132,7 @@ public class LogFileSource extends AbstractSource {
     private volatile boolean running = false;
 
     public LogFileSource() {
+        OffsetManager.init();
     }
 
     @Override
@@ -175,7 +176,7 @@ public class LogFileSource extends AbstractSource {
     }
 
     private long getInitLineOffset(boolean isIncrement, String taskId, String 
instanceId, String inodeInfo) {
-        OffsetProfile offsetProfile = OffsetManager.init().getOffset(taskId, 
instanceId);
+        OffsetProfile offsetProfile = 
OffsetManager.getInstance().getOffset(taskId, instanceId);
         int fileLineCount = getRealLineCount(instanceId);
         long offset = 0;
         if (offsetProfile != null && 
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
@@ -334,9 +335,8 @@ public class LogFileSource extends AbstractSource {
         }
         if (sourceData == null) {
             return null;
-        } else {
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
         }
+        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
         Message finalMsg = createMessage(sourceData);
         return finalMsg;
     }
@@ -377,7 +377,7 @@ public class LogFileSource extends AbstractSource {
             suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
             if (!suc) {
                 MemoryManager.getInstance().printDetail(permitName, "log file 
source");
-                if (!isInodeChanged() || !isRunnable()) {
+                if (isInodeChanged() || !isRunnable()) {
                     return false;
                 }
                 AgentUtils.silenceSleepInSeconds(1);
@@ -530,6 +530,14 @@ public class LogFileSource extends AbstractSource {
 
     @Override
     public boolean sourceFinish() {
+        if (finishReadLog() && queue.isEmpty()) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public boolean finishReadLog() {
         return readEndCount > FINISH_READ_MAX_COUNT;
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index eb9c8ccf89..b2a3b605b7 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -101,7 +101,7 @@ public class TestLogFileSource {
             srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
         }
         LogFileSource source = getSource();
-        await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
source.finishReadLog());
         int cnt = 0;
         int leftBeforeRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
         Assert.assertTrue(leftBeforeRead + srcLen == 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
@@ -114,6 +114,7 @@ public class TestLogFileSource {
             msg = source.read();
             cnt++;
         }
+        await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
         source.destroy();
         Assert.assertTrue(cnt == 3);
         Assert.assertTrue(srcLen == readLen);
@@ -123,10 +124,11 @@ public class TestLogFileSource {
 
     private void testCleanQueue() {
         LogFileSource source = getSource();
-        await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
source.finishReadLog());
         for (int i = 0; i < 2; i++) {
             source.read();
         }
+        Assert.assertTrue(!source.sourceFinish());
         source.destroy();
         int leftAfterRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
         Assert.assertTrue(leftAfterRead == 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);

Reply via email to