This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 764b1e5251 [INLONG-11811][Agent] Increase the retention time of offset, default to 7 days (#11812) 764b1e5251 is described below commit 764b1e52517e60413ccec099d6e6f31224d2c868 Author: justinwwhuang <wenweihu...@apache.org> AuthorDate: Wed Mar 26 17:10:19 2025 +0800 [INLONG-11811][Agent] Increase the retention time of offset, default to 7 days (#11812) --- .../main/java/org/apache/inlong/agent/constant/AgentConstants.java | 5 +++-- .../main/java/org/apache/inlong/agent/core/task/OffsetManager.java | 6 ++++-- .../inlong/agent/plugin/task/logcollection/LogAbstractTask.java | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 45ba789250..d9880800e6 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -77,10 +77,11 @@ public class AgentConstants { public static final boolean DEFAULT_ENABLE_OOM_EXIT = false; public static final String AGENT_SCAN_RANGE = "agent.scan.range"; + public static final String AGENT_OFFSET_TTL = "agent.offset.ttl"; public static final String DEFAULT_AGENT_SCAN_RANGE = "-2"; public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2"; - public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10"; - public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600"; + public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-2"; + public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-120"; public static final String AGENT_INSTANCE_LIMIT = "agent.instance.limit"; public static final int DEFAULT_AGENT_INSTANCE_LIMIT = 100; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java index b82e399c81..bd2925305e 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java @@ -53,7 +53,7 @@ public class OffsetManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000; public static final int CLEAN_INSTANCE_ONCE_LIMIT = 1000; - public static final long TWO_HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000; + public static final long SEVEN_DAY_TIMEOUT_INTERVAL_MS = 7 * 24 * 3600 * 1000; private static volatile OffsetManager offsetManager = null; private final OffsetStore offsetStore; private final InstanceStore instanceStore; @@ -163,7 +163,9 @@ public class OffsetManager extends AbstractDaemon { } } } - long expireTime = Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) + TWO_HOUR_TIMEOUT_INTERVAL; + long expireTime = + Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) + AgentConfiguration.getAgentConf() + .getLong(AgentConstants.AGENT_OFFSET_TTL, SEVEN_DAY_TIMEOUT_INTERVAL_MS); if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > expireTime) { cleanCount.getAndIncrement(); LOGGER.info("instance has expired, delete from instance store dataTime {} taskId {} instanceId {}", diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java index 49e45ba751..1e354e7c4b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java @@ -45,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue; public abstract class LogAbstractTask extends AbstractTask { private static final int INSTANCE_QUEUE_CAPACITY = 10; - public static final long ONE_HOUR_TIMEOUT_INTERVAL = 3600 * 1000; + public static final long ONE_HOUR_TIMEOUT_INTERVAL_MS = 3600 * 1000; private static final Logger LOGGER = LoggerFactory.getLogger(LogAbstractTask.class); protected boolean retry; protected BlockingQueue<InstanceProfile> instanceQueue; @@ -210,7 +210,7 @@ public abstract class LogAbstractTask extends AbstractTask { String dataTime = entry.getKey(); if (!DateUtils.isValidCreationTime(dataTime, Math.abs(OffsetManager.getScanCycleRange(taskProfile.getCycleUnit())) - + ONE_HOUR_TIMEOUT_INTERVAL)) { + + ONE_HOUR_TIMEOUT_INTERVAL_MS)) { /* Remove it from memory map. */ eventMap.remove(dataTime); LOGGER.warn("remove too old event from event map taskId {} dataTime {}", taskProfile.getTaskId(),