This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3de83614e8 [INLONG-11762][Agent] Modify the logic for determining the end of the data source (#11763) 3de83614e8 is described below commit 3de83614e89ebdfac9e5f10727f3e31368a55617 Author: justinwwhuang <wenweihu...@apache.org> AuthorDate: Fri Feb 14 15:59:18 2025 +0800 [INLONG-11762][Agent] Modify the logic for determining the end of the data source (#11763) --- .../inlong/agent/plugin/sources/LogFileSource.java | 6 ++++++ .../agent/plugin/sources/file/AbstractSource.java | 17 +++++++++++++++-- .../inlong/agent/plugin/sources/TestLogFileSource.java | 2 +- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 6774751a34..a78abe3305 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -30,6 +30,7 @@ import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.file.FileUtils; import lombok.AllArgsConstructor; import lombok.Data; @@ -374,4 +375,9 @@ public class LogFileSource extends AbstractSource { } } } + + @Override + public long getLastModifyTime() { + return FileUtils.getFileLastModifyTime(fileName); + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index df3d14652e..419d511bf0 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -81,7 +81,7 @@ public abstract class AbstractSource implements Source { protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024; protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; protected final Integer WAIT_TIMEOUT_MS = 10; - private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100; + private final Integer SOURCE_NO_UPDATE_INTERVAL_MS = 5 * 60 * 1000; private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; protected BlockingQueue<SourceData> queue; @@ -429,6 +429,19 @@ public abstract class AbstractSource implements Source { if (isRealTime) { return false; } - return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST; + if (emptyCount == 0) { + return false; + } + if (profile.isRetry()) { + return true; + } + if (AgentUtils.getCurrentTime() - getLastModifyTime() > SOURCE_NO_UPDATE_INTERVAL_MS) { + return true; + } + return false; + } + + public long getLastModifyTime() { + return 0; } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 2bbe41859a..1fefbbc1d2 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -95,7 +95,7 @@ public class TestLogFileSource { Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10); Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0); Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2); - Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3); + Whitebox.setInternalState(source, "SOURCE_NO_UPDATE_INTERVAL_MS", 1000); Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10); if (lineOffset > 0) { String finalOffset = Long.toString(lineOffset);