This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3de83614e8 [INLONG-11762][Agent] Modify the logic for determining the 
end of the data source (#11763)
3de83614e8 is described below

commit 3de83614e89ebdfac9e5f10727f3e31368a55617
Author: justinwwhuang <wenweihu...@apache.org>
AuthorDate: Fri Feb 14 15:59:18 2025 +0800

    [INLONG-11762][Agent] Modify the logic for determining the end of the data 
source (#11763)
---
 .../inlong/agent/plugin/sources/LogFileSource.java      |  6 ++++++
 .../agent/plugin/sources/file/AbstractSource.java       | 17 +++++++++++++++--
 .../inlong/agent/plugin/sources/TestLogFileSource.java  |  2 +-
 3 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 6774751a34..a78abe3305 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -30,6 +30,7 @@ import 
org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.file.FileUtils;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -374,4 +375,9 @@ public class LogFileSource extends AbstractSource {
             }
         }
     }
+
+    @Override
+    public long getLastModifyTime() {
+        return FileUtils.getFileLastModifyTime(fileName);
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index df3d14652e..419d511bf0 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -81,7 +81,7 @@ public abstract class AbstractSource implements Source {
     protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
     protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
     protected final Integer WAIT_TIMEOUT_MS = 10;
-    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100;
+    private final Integer SOURCE_NO_UPDATE_INTERVAL_MS = 5 * 60 * 1000;
     private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
     protected BlockingQueue<SourceData> queue;
 
@@ -429,6 +429,19 @@ public abstract class AbstractSource implements Source {
         if (isRealTime) {
             return false;
         }
-        return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
+        if (emptyCount == 0) {
+            return false;
+        }
+        if (profile.isRetry()) {
+            return true;
+        }
+        if (AgentUtils.getCurrentTime() - getLastModifyTime() > 
SOURCE_NO_UPDATE_INTERVAL_MS) {
+            return true;
+        }
+        return false;
+    }
+
+    public long getLastModifyTime() {
+        return 0;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 2bbe41859a..1fefbbc1d2 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -95,7 +95,7 @@ public class TestLogFileSource {
             Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
             Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 
0);
             Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 
2);
-            Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
+            Whitebox.setInternalState(source, "SOURCE_NO_UPDATE_INTERVAL_MS", 
1000);
             Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
             if (lineOffset > 0) {
                 String finalOffset = Long.toString(lineOffset);

Reply via email to