This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 764b1e5251 [INLONG-11811][Agent] Increase the retention time of 
offset, default to 7 days (#11812)
764b1e5251 is described below

commit 764b1e52517e60413ccec099d6e6f31224d2c868
Author: justinwwhuang <wenweihu...@apache.org>
AuthorDate: Wed Mar 26 17:10:19 2025 +0800

    [INLONG-11811][Agent] Increase the retention time of offset, default to 7 
days (#11812)
---
 .../main/java/org/apache/inlong/agent/constant/AgentConstants.java  | 5 +++--
 .../main/java/org/apache/inlong/agent/core/task/OffsetManager.java  | 6 ++++--
 .../inlong/agent/plugin/task/logcollection/LogAbstractTask.java     | 4 ++--
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 45ba789250..d9880800e6 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -77,10 +77,11 @@ public class AgentConstants {
     public static final boolean DEFAULT_ENABLE_OOM_EXIT = false;
 
     public static final String AGENT_SCAN_RANGE = "agent.scan.range";
+    public static final String AGENT_OFFSET_TTL = "agent.offset.ttl";
     public static final String DEFAULT_AGENT_SCAN_RANGE = "-2";
     public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
-    public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
-    public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";
+    public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-2";
+    public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-120";
     public static final String AGENT_INSTANCE_LIMIT = "agent.instance.limit";
     public static final int DEFAULT_AGENT_INSTANCE_LIMIT = 100;
 
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index b82e399c81..bd2925305e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -53,7 +53,7 @@ public class OffsetManager extends AbstractDaemon {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetManager.class);
     public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
     public static final int CLEAN_INSTANCE_ONCE_LIMIT = 1000;
-    public static final long TWO_HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
+    public static final long SEVEN_DAY_TIMEOUT_INTERVAL_MS = 7 * 24 * 3600 * 
1000;
     private static volatile OffsetManager offsetManager = null;
     private final OffsetStore offsetStore;
     private final InstanceStore instanceStore;
@@ -163,7 +163,9 @@ public class OffsetManager extends AbstractDaemon {
                     }
                 }
             }
-            long expireTime = 
Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) + 
TWO_HOUR_TIMEOUT_INTERVAL;
+            long expireTime =
+                    Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) 
+ AgentConfiguration.getAgentConf()
+                            .getLong(AgentConstants.AGENT_OFFSET_TTL, 
SEVEN_DAY_TIMEOUT_INTERVAL_MS);
             if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > 
expireTime) {
                 cleanCount.getAndIncrement();
                 LOGGER.info("instance has expired, delete from instance store 
dataTime {} taskId {} instanceId {}",
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
index 49e45ba751..1e354e7c4b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -45,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 public abstract class LogAbstractTask extends AbstractTask {
 
     private static final int INSTANCE_QUEUE_CAPACITY = 10;
-    public static final long ONE_HOUR_TIMEOUT_INTERVAL = 3600 * 1000;
+    public static final long ONE_HOUR_TIMEOUT_INTERVAL_MS = 3600 * 1000;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogAbstractTask.class);
     protected boolean retry;
     protected BlockingQueue<InstanceProfile> instanceQueue;
@@ -210,7 +210,7 @@ public abstract class LogAbstractTask extends AbstractTask {
             String dataTime = entry.getKey();
             if (!DateUtils.isValidCreationTime(dataTime,
                     
Math.abs(OffsetManager.getScanCycleRange(taskProfile.getCycleUnit()))
-                            + ONE_HOUR_TIMEOUT_INTERVAL)) {
+                            + ONE_HOUR_TIMEOUT_INTERVAL_MS)) {
                 /* Remove it from memory map. */
                 eventMap.remove(dataTime);
                 LOGGER.warn("remove too old event from event map taskId {} 
dataTime {}", taskProfile.getTaskId(),

Reply via email to