This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e87f3605d4 [INLONG-9366][Agent] Remove useless offset record (#9367) e87f3605d4 is described below commit e87f3605d485b14a4c548de3afcb1c375650530d Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Nov 30 14:05:42 2023 +0800 [INLONG-9366][Agent] Remove useless offset record (#9367) --- .../java/org/apache/inlong/agent/db/OffsetDb.java | 20 +++++-- .../org/apache/inlong/agent/core/AgentManager.java | 2 - .../agent/core/instance/InstanceManager.java | 2 +- .../inlong/agent/core/task/OffsetManager.java | 64 ++++++++++++++++++++-- .../inlong/agent/core/task/file/TaskManager.java | 3 + .../inlong/agent/plugin/instance/FileInstance.java | 2 +- .../agent/plugin/sinks/filecollect/ProxySink.java | 2 +- .../inlong/agent/plugin/sources/LogFileSource.java | 1 - .../agent/plugin/sources/TestLogFileSource.java | 12 ++++ 9 files changed, 90 insertions(+), 18 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java index 5ceeb2e4ea..5c31a2f88a 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java @@ -17,9 +17,7 @@ package org.apache.inlong.agent.db; -import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.OffsetProfile; -import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.CommonConstants; import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.utils.AgentUtils; @@ -27,6 +25,9 @@ import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + /** * db interface for task profile. */ @@ -34,11 +35,9 @@ public class OffsetDb { private static final Logger LOGGER = LoggerFactory.getLogger(OffsetDb.class); private final Db db; - private final AgentConfiguration agentConf; - public OffsetDb() { - agentConf = AgentConfiguration.getAgentConf(); - db = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET)); + public OffsetDb(Db db) { + this.db = db; } /** @@ -54,6 +53,15 @@ public class OffsetDb { } } + public List<OffsetProfile> listAllOffsets() { + List<KeyValueEntity> result = this.db.findAll(""); + List<OffsetProfile> offsetList = new ArrayList<>(); + for (KeyValueEntity entity : result) { + offsetList.add(entity.getAsOffsetProfile()); + } + return offsetList; + } + public OffsetProfile getOffset(String taskId, String instanceId) { KeyValueEntity result = db.get(getKey(taskId, instanceId)); if (result == null) { diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java index 5c28e172b6..29c000dcc4 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java @@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.ProfileFetcher; import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.file.TaskManager; import org.slf4j.Logger; @@ -48,7 +47,6 @@ public class AgentManager extends AbstractDaemon { public AgentManager() { conf = AgentConfiguration.getAgentConf(); agentConfMonitor = Executors.newSingleThreadExecutor(); - OffsetManager.init(); taskManager = new TaskManager(); fetcher = initFetcher(this); heartbeatManager = HeartbeatManager.getInstance(this); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 260e5a477f..9f5773d587 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -58,7 +58,7 @@ public class InstanceManager extends AbstractDaemon { public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000; private long lastCleanTime = 0; - public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "-3"; + public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3"; // instance in db private final InstanceDb instanceDb; TaskProfileDb taskProfileDb; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java index 0c4ca513ce..fca223b873 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java @@ -17,38 +17,80 @@ package org.apache.inlong.agent.core.task; +import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.db.InstanceDb; import org.apache.inlong.agent.db.OffsetDb; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + /** * used to store instance offset to db * where key is task id + read file name and value is instance offset */ -public class OffsetManager { +public class OffsetManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); + public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000; private static volatile OffsetManager offsetManager = null; private final OffsetDb offsetDb; + // instance in db + private final InstanceDb instanceDb; + + private OffsetManager(Db offsetBasicDb, Db instanceBasicDb) { + this.offsetDb = new OffsetDb(offsetBasicDb); + instanceDb = new InstanceDb(instanceBasicDb); + } - private OffsetManager() { - this.offsetDb = new OffsetDb(); + /** + * thread for core thread. + * + * @return runnable profile. + */ + private Runnable coreThread() { + return () -> { + Thread.currentThread().setName("offset-manager-core"); + while (isRunnable()) { + try { + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + List<OffsetProfile> offsets = offsetDb.listAllOffsets(); + offsets.forEach(offset -> { + String taskId = offset.getTaskId(); + String instanceId = offset.getInstanceId(); + InstanceProfile instanceProfile = instanceDb.getInstance(taskId, instanceId); + if (instanceProfile == null) { + deleteOffset(taskId, instanceId); + LOGGER.info("instance not found, delete offset taskId {} instanceId {}", taskId, + instanceId); + } + }); + LOGGER.info("offsetManager running! offsets count {}", offsets.size()); + } catch (Throwable ex) { + LOGGER.error("offset-manager-core: ", ex); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } + } + }; } /** * task position manager singleton, can only generated by agent manager */ - public static OffsetManager init() { + public static void init(Db offsetBasicDb, Db instanceBasicDb) { if (offsetManager == null) { synchronized (OffsetManager.class) { if (offsetManager == null) { - offsetManager = new OffsetManager(); + offsetManager = new OffsetManager(offsetBasicDb, instanceBasicDb); } } } - return offsetManager; } /** @@ -72,4 +114,14 @@ public class OffsetManager { public OffsetProfile getOffset(String taskId, String instanceId) { return offsetDb.getOffset(taskId, instanceId); } + + @Override + public void start() throws Exception { + submitWorker(coreThread()); + } + + @Override + public void stop() throws Exception { + + } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index 7027b798f8..e188e4f207 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -22,6 +22,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.TaskAction; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.RocksDbImp; @@ -131,6 +132,7 @@ public class TaskManager extends AbstractDaemon { agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE)); offsetBasicDb = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET)); + OffsetManager.init(offsetBasicDb, instanceBasicDb); this.runningPool = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, @@ -519,6 +521,7 @@ public class TaskManager extends AbstractDaemon { public void start() throws Exception { restoreFromDb(); submitWorker(coreThread()); + OffsetManager.getInstance().start(); } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index 13aef15f9d..1477ebbbac 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -131,7 +131,7 @@ public class FileInstance extends Instance { } private void handleSourceDeleted() { - OffsetManager.init().deleteOffset(getTaskId(), getInstanceId()); + OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId()); profile.setState(InstanceStateEnum.DELETE); profile.setModifyTime(AgentUtils.getCurrentTime()); InstanceAction action = new InstanceAction(ActionType.DELETE, profile); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java index ad4f07258a..5a01f64fa9 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java @@ -182,7 +182,7 @@ public class ProxySink extends AbstractSink { fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER, DEFAULT_FIELD_SPLITTER).getBytes( StandardCharsets.UTF_8); sourceName = profile.getInstanceId(); - offsetManager = OffsetManager.init(); + offsetManager = OffsetManager.getInstance(); senderManager = new SenderManager(profile, inlongGroupId, sourceName); try { senderManager.Start(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 4b61d32fc4..7c719b1e47 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -142,7 +142,6 @@ public class LogFileSource extends AbstractSource { private boolean isRealTime = false; public LogFileSource() { - OffsetManager.init(); } @Override diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 3397f9f58a..be69fbcfea 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -19,8 +19,12 @@ package org.apache.inlong.agent.plugin.sources; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.file.MemoryManager; +import org.apache.inlong.agent.core.task.file.TaskManager; +import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; @@ -52,6 +56,10 @@ public class TestLogFileSource { private static final String[] check = {"hello line-end-symbol aa", "world line-end-symbol", "agent line-end-symbol"}; private static InstanceProfile instanceProfile; + // instance basic db + private static Db instanceBasicDb; + // offset basic db + private static Db offsetBasicDb; @BeforeClass public static void setup() { @@ -62,6 +70,10 @@ public class TestLogFileSource { TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); instanceProfile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime()); + instanceBasicDb = TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE); + offsetBasicDb = + TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET); + OffsetManager.init(offsetBasicDb, instanceBasicDb); } private LogFileSource getSource() {