This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e87f3605d4 [INLONG-9366][Agent] Remove useless offset record (#9367)
e87f3605d4 is described below

commit e87f3605d485b14a4c548de3afcb1c375650530d
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Nov 30 14:05:42 2023 +0800

    [INLONG-9366][Agent] Remove useless offset record (#9367)
---
 .../java/org/apache/inlong/agent/db/OffsetDb.java  | 20 +++++--
 .../org/apache/inlong/agent/core/AgentManager.java |  2 -
 .../agent/core/instance/InstanceManager.java       |  2 +-
 .../inlong/agent/core/task/OffsetManager.java      | 64 ++++++++++++++++++++--
 .../inlong/agent/core/task/file/TaskManager.java   |  3 +
 .../inlong/agent/plugin/instance/FileInstance.java |  2 +-
 .../agent/plugin/sinks/filecollect/ProxySink.java  |  2 +-
 .../inlong/agent/plugin/sources/LogFileSource.java |  1 -
 .../agent/plugin/sources/TestLogFileSource.java    | 12 ++++
 9 files changed, 90 insertions(+), 18 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
index 5ceeb2e4ea..5c31a2f88a 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
@@ -17,9 +17,7 @@
 
 package org.apache.inlong.agent.db;
 
-import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.OffsetProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -27,6 +25,9 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * db interface for task profile.
  */
@@ -34,11 +35,9 @@ public class OffsetDb {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetDb.class);
     private final Db db;
-    private final AgentConfiguration agentConf;
 
-    public OffsetDb() {
-        agentConf = AgentConfiguration.getAgentConf();
-        db = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+    public OffsetDb(Db db) {
+        this.db = db;
     }
 
     /**
@@ -54,6 +53,15 @@ public class OffsetDb {
         }
     }
 
+    public List<OffsetProfile> listAllOffsets() {
+        List<KeyValueEntity> result = this.db.findAll("");
+        List<OffsetProfile> offsetList = new ArrayList<>();
+        for (KeyValueEntity entity : result) {
+            offsetList.add(entity.getAsOffsetProfile());
+        }
+        return offsetList;
+    }
+
     public OffsetProfile getOffset(String taskId, String instanceId) {
         KeyValueEntity result = db.get(getKey(taskId, instanceId));
         if (result == null) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 5c28e172b6..29c000dcc4 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.ProfileFetcher;
 import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.file.TaskManager;
 
 import org.slf4j.Logger;
@@ -48,7 +47,6 @@ public class AgentManager extends AbstractDaemon {
     public AgentManager() {
         conf = AgentConfiguration.getAgentConf();
         agentConfMonitor = Executors.newSingleThreadExecutor();
-        OffsetManager.init();
         taskManager = new TaskManager();
         fetcher = initFetcher(this);
         heartbeatManager = HeartbeatManager.getInstance(this);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 260e5a477f..9f5773d587 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -58,7 +58,7 @@ public class InstanceManager extends AbstractDaemon {
     public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
     public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000;
     private long lastCleanTime = 0;
-    public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "-3";
+    public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
     // instance in db
     private final InstanceDb instanceDb;
     TaskProfileDb taskProfileDb;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 0c4ca513ce..fca223b873 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -17,38 +17,80 @@
 
 package org.apache.inlong.agent.core.task;
 
+import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.InstanceDb;
 import org.apache.inlong.agent.db.OffsetDb;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * used to store instance offset to db
  * where key is task id + read file name and value is instance offset
  */
-public class OffsetManager {
+public class OffsetManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetManager.class);
+    public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
     private static volatile OffsetManager offsetManager = null;
     private final OffsetDb offsetDb;
+    // instance in db
+    private final InstanceDb instanceDb;
+
+    private OffsetManager(Db offsetBasicDb, Db instanceBasicDb) {
+        this.offsetDb = new OffsetDb(offsetBasicDb);
+        instanceDb = new InstanceDb(instanceBasicDb);
+    }
 
-    private OffsetManager() {
-        this.offsetDb = new OffsetDb();
+    /**
+     * thread for core thread.
+     *
+     * @return runnable profile.
+     */
+    private Runnable coreThread() {
+        return () -> {
+            Thread.currentThread().setName("offset-manager-core");
+            while (isRunnable()) {
+                try {
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                    List<OffsetProfile> offsets = offsetDb.listAllOffsets();
+                    offsets.forEach(offset -> {
+                        String taskId = offset.getTaskId();
+                        String instanceId = offset.getInstanceId();
+                        InstanceProfile instanceProfile = 
instanceDb.getInstance(taskId, instanceId);
+                        if (instanceProfile == null) {
+                            deleteOffset(taskId, instanceId);
+                            LOGGER.info("instance not found, delete offset 
taskId {} instanceId {}", taskId,
+                                    instanceId);
+                        }
+                    });
+                    LOGGER.info("offsetManager running! offsets count {}", 
offsets.size());
+                } catch (Throwable ex) {
+                    LOGGER.error("offset-manager-core: ", ex);
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
+                }
+            }
+        };
     }
 
     /**
      * task position manager singleton, can only generated by agent manager
      */
-    public static OffsetManager init() {
+    public static void init(Db offsetBasicDb, Db instanceBasicDb) {
         if (offsetManager == null) {
             synchronized (OffsetManager.class) {
                 if (offsetManager == null) {
-                    offsetManager = new OffsetManager();
+                    offsetManager = new OffsetManager(offsetBasicDb, 
instanceBasicDb);
                 }
             }
         }
-        return offsetManager;
     }
 
     /**
@@ -72,4 +114,14 @@ public class OffsetManager {
     public OffsetProfile getOffset(String taskId, String instanceId) {
         return offsetDb.getOffset(taskId, instanceId);
     }
+
+    @Override
+    public void start() throws Exception {
+        submitWorker(coreThread());
+    }
+
+    @Override
+    public void stop() throws Exception {
+
+    }
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 7027b798f8..e188e4f207 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskAction;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.RocksDbImp;
@@ -131,6 +132,7 @@ public class TaskManager extends AbstractDaemon {
                 agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
         offsetBasicDb =
                 initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+        OffsetManager.init(offsetBasicDb, instanceBasicDb);
         this.runningPool = new ThreadPoolExecutor(
                 0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
@@ -519,6 +521,7 @@ public class TaskManager extends AbstractDaemon {
     public void start() throws Exception {
         restoreFromDb();
         submitWorker(coreThread());
+        OffsetManager.getInstance().start();
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 13aef15f9d..1477ebbbac 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -131,7 +131,7 @@ public class FileInstance extends Instance {
     }
 
     private void handleSourceDeleted() {
-        OffsetManager.init().deleteOffset(getTaskId(), getInstanceId());
+        OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
         profile.setState(InstanceStateEnum.DELETE);
         profile.setModifyTime(AgentUtils.getCurrentTime());
         InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index ad4f07258a..5a01f64fa9 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -182,7 +182,7 @@ public class ProxySink extends AbstractSink {
         fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER, 
DEFAULT_FIELD_SPLITTER).getBytes(
                 StandardCharsets.UTF_8);
         sourceName = profile.getInstanceId();
-        offsetManager = OffsetManager.init();
+        offsetManager = OffsetManager.getInstance();
         senderManager = new SenderManager(profile, inlongGroupId, sourceName);
         try {
             senderManager.Start();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 4b61d32fc4..7c719b1e47 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -142,7 +142,6 @@ public class LogFileSource extends AbstractSource {
     private boolean isRealTime = false;
 
     public LogFileSource() {
-        OffsetManager.init();
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 3397f9f58a..be69fbcfea 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -19,8 +19,12 @@ package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.file.MemoryManager;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
@@ -52,6 +56,10 @@ public class TestLogFileSource {
     private static final String[] check = {"hello line-end-symbol aa", "world 
line-end-symbol",
             "agent line-end-symbol"};
     private static InstanceProfile instanceProfile;
+    // instance basic db
+    private static Db instanceBasicDb;
+    // offset basic db
+    private static Db offsetBasicDb;
 
     @BeforeClass
     public static void setup() {
@@ -62,6 +70,10 @@ public class TestLogFileSource {
         TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
         instanceProfile = taskProfile.createInstanceProfile("",
                 fileName, taskProfile.getCycleUnit(), "20230928", 
AgentUtils.getCurrentTime());
+        instanceBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
+        offsetBasicDb =
+                TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
+        OffsetManager.init(offsetBasicDb, instanceBasicDb);
     }
 
     private LogFileSource getSource() {

Reply via email to