This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f62b2e87a1 [INLONG-9364][Agent] Remove expired instance from db (#9365) f62b2e87a1 is described below commit f62b2e87a12d5c7c2d75417715d3edb422230dab Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Nov 30 11:16:23 2023 +0800 [INLONG-9364][Agent] Remove expired instance from db (#9365) --- .../apache/inlong/agent/utils/DateTransUtils.java | 40 ++++++++++++ .../agent/core/instance/InstanceManager.java | 74 ++++++++++++++++++---- .../inlong/agent/core/task/file/TaskManager.java | 22 +++++-- .../agent/core/instance/TestInstanceManager.java | 6 +- .../agent/plugin/task/filecollect/FileScanner.java | 4 +- .../task/filecollect/LogFileCollectTask.java | 6 +- .../agent/plugin/utils/file/NewDateUtils.java | 44 +------------ .../inlong/agent/plugin/utils/TestUtils.java | 14 ++-- .../apache/inlong/common/enums/TaskStateEnum.java | 4 +- 9 files changed, 139 insertions(+), 75 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java index 55182c7dd8..fe6257d64e 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java @@ -111,4 +111,44 @@ public class DateTransUtils { return retTime; } + /** + * Calculate offset time based on offset + * The current offset will only be offset forward, or it can be offset backward to be compatible with the previous + * calculation method (subtraction). + * When it is offset backward, it returns negative; + * When offset forward, return positive + * + * @param timeOffset offset,such as -1d,-4h,-10m; + * @return + */ + public static long calcOffset(String timeOffset) { + if (timeOffset.length() == 0) { + return 0; + } + String offsetUnit = timeOffset.substring(timeOffset.length() - 1); + int startIndex; + int symbol; + if (timeOffset.charAt(0) == '-') { + symbol = -1; + startIndex = 1; + } else { + symbol = 1; + startIndex = 0; + } + + String strOffset = timeOffset.substring(startIndex, timeOffset.length() - 1); + if (strOffset.length() == 0) { + return 0; + } + int offsetTime = Integer.parseInt(strOffset); + if ("d".equalsIgnoreCase(offsetUnit)) { + return offsetTime * 24 * 3600 * 1000 * symbol; + } else if ("h".equalsIgnoreCase(offsetUnit)) { + return offsetTime * 3600 * 1000 * symbol; + } else if ("m".equalsIgnoreCase(offsetUnit)) { + return offsetTime * 60 * 1000 * symbol; + } + return 0; + } + } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index a80ce8b53b..260e5a477f 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -21,16 +21,22 @@ import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.InstanceDb; +import org.apache.inlong.agent.db.TaskProfileDb; import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.enums.InstanceStateEnum; +import org.apache.inlong.common.enums.TaskStateEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +44,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * handle the instance created by task, including add, delete, update etc. @@ -47,11 +54,14 @@ public class InstanceManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceManager.class); private static final int ACTION_QUEUE_CAPACITY = 100; + public static final int CLEAN_INSTANCE_ONCE_LIMIT = 10; public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; - public static final int CORE_THREAD_PRINT_TIME = 10000; - private long lastPrintTime = 0; - // task in db + public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000; + private long lastCleanTime = 0; + public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "-3"; + // instance in db private final InstanceDb instanceDb; + TaskProfileDb taskProfileDb; // task in memory private final ConcurrentHashMap<String, Instance> instanceMap; // instance profile queue. @@ -105,9 +115,10 @@ public class InstanceManager extends AbstractDaemon { /** * Init task manager. */ - public InstanceManager(String taskId, int instanceLimit, Db basicDb) { + public InstanceManager(String taskId, int instanceLimit, Db basicDb, TaskProfileDb taskProfileDb) { this.taskId = taskId; instanceDb = new InstanceDb(basicDb); + this.taskProfileDb = taskProfileDb; this.agentConf = AgentConfiguration.getAgentConf(); instanceMap = new ConcurrentHashMap<>(); this.instanceLimit = instanceLimit; @@ -145,11 +156,11 @@ public class InstanceManager extends AbstractDaemon { while (isRunnable()) { try { AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); - printInstanceDetail(); + cleanDbInstance(); dealWithActionQueue(actionQueue); keepPaceWithDb(); } catch (Throwable ex) { - LOGGER.error("coreThread {}", ex.getMessage()); + LOGGER.error("coreThread {}", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); } runAtLeastOneTime = true; @@ -158,9 +169,10 @@ public class InstanceManager extends AbstractDaemon { }; } - private void printInstanceDetail() { - if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) { + private void cleanDbInstance() { + if (AgentUtils.getCurrentTime() - lastCleanTime > INSTANCE_DB_CLEAN_INTERVAL_MS) { List<InstanceProfile> instances = instanceDb.getInstances(taskId); + doCleanDbInstance(instances); InstancePrintStat stat = new InstancePrintStat(); for (int i = 0; i < instances.size(); i++) { InstanceProfile instance = instances.get(i); @@ -169,7 +181,45 @@ public class InstanceManager extends AbstractDaemon { LOGGER.info( "instanceManager running! taskId {} mem {} db total {} {} action count {}", taskId, instanceMap.size(), instances.size(), stat, actionQueue.size()); - lastPrintTime = AgentUtils.getCurrentTime(); + lastCleanTime = AgentUtils.getCurrentTime(); + } + } + + private void doCleanDbInstance(List<InstanceProfile> instances) { + AtomicInteger cleanCount = new AtomicInteger(); + Iterator<InstanceProfile> iterator = instances.iterator(); + while (iterator.hasNext()) { + if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) { + return; + } + InstanceProfile instanceFromDb = iterator.next(); + if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) { + return; + } + TaskProfile taskFromDb = taskProfileDb.getTask(taskId); + if (taskFromDb != null) { + if (taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { + return; + } + if (taskFromDb.isRetry()) { + if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) { + return; + } + } else { + if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) { + return; + } + } + } + long expireTime = DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT + taskFromDb.getCycleUnit()); + if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > expireTime) { + cleanCount.getAndIncrement(); + LOGGER.info("instance has expired, delete from db dataTime {} taskId {} instanceId {}", + instanceFromDb.getSourceDataTime(), instanceFromDb.getTaskId(), + instanceFromDb.getInstanceId()); + instanceDb.deleteInstance(instanceFromDb.getTaskId(), instanceFromDb.getInstanceId()); + iterator.remove(); + } } } @@ -181,10 +231,10 @@ public class InstanceManager extends AbstractDaemon { private void traverseDbTasksToMemory() { instanceDb.getInstances(taskId).forEach((profileFromDb) -> { InstanceStateEnum dbState = profileFromDb.getState(); - Instance task = instanceMap.get(profileFromDb.getInstanceId()); + Instance instance = instanceMap.get(profileFromDb.getInstanceId()); switch (dbState) { case DEFAULT: { - if (task == null) { + if (instance == null) { LOGGER.info("traverseDbTasksToMemory add instance to mem taskId {} instanceId {}", profileFromDb.getTaskId(), profileFromDb.getInstanceId()); addToMemory(profileFromDb); @@ -193,7 +243,7 @@ public class InstanceManager extends AbstractDaemon { } case FINISHED: DELETE: { - if (task != null) { + if (instance != null) { LOGGER.info("traverseDbTasksToMemory delete instance from mem taskId {} instanceId {}", profileFromDb.getTaskId(), profileFromDb.getInstanceId()); deleteFromMemory(profileFromDb.getInstanceId()); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index 4d8bc9fea6..7027b798f8 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -63,6 +63,8 @@ public class TaskManager extends AbstractDaemon { private final Db taskBasicDb; // instance basic db private final Db instanceBasicDb; + // offset basic db + private final Db offsetBasicDb; // task in db private final TaskProfileDb taskDb; // task in memory @@ -100,7 +102,7 @@ public class TaskManager extends AbstractDaemon { frozenCount++; break; } - case FINISH: { + case RETRY_FINISH: { finishedCount++; break; } @@ -122,11 +124,13 @@ public class TaskManager extends AbstractDaemon { */ public TaskManager() { this.agentConf = AgentConfiguration.getAgentConf(); - this.taskBasicDb = initDb( + taskBasicDb = initDb( agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_TASK)); - this.instanceBasicDb = initDb( - agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE)); taskDb = new TaskProfileDb(taskBasicDb); + instanceBasicDb = initDb( + agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE)); + offsetBasicDb = + initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET)); this.runningPool = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, @@ -139,6 +143,10 @@ public class TaskManager extends AbstractDaemon { actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); } + public TaskProfileDb getTaskDb() { + return taskDb; + } + /** * init db by class name * @@ -284,7 +292,7 @@ public class TaskManager extends AbstractDaemon { if (managerState == dbState) { return; } - if (dbState == TaskStateEnum.FINISH) { + if (dbState == TaskStateEnum.RETRY_FINISH) { LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, do nothing", taskFromDb.getTaskId(), dbState, taskFromDb.isRetry()); @@ -335,7 +343,7 @@ public class TaskManager extends AbstractDaemon { deleteFromMemory(profileFromDb.getTaskId()); } } else { - if (dbState != TaskStateEnum.FINISH) { + if (dbState != TaskStateEnum.RETRY_FINISH) { LOGGER.error("task {} invalid state {}", profileFromDb.getTaskId(), dbState); } } @@ -394,7 +402,7 @@ public class TaskManager extends AbstractDaemon { } private void finishTask(TaskProfile taskProfile) { - taskProfile.setState(TaskStateEnum.FINISH); + taskProfile.setState(TaskStateEnum.RETRY_FINISH); updateToDb(taskProfile); deleteFromMemory(taskProfile.getTaskId()); } diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index 558bed0204..262565022e 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -19,9 +19,11 @@ package org.apache.inlong.agent.core.instance; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.core.AgentBaseTestsHelper; import org.apache.inlong.agent.core.task.file.TaskManager; import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.db.TaskProfileDb; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.enums.InstanceStateEnum; @@ -53,7 +55,9 @@ public class TestInstanceManager { String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt"; Db basicDb = TaskManager.initDb("/localdb"); taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+6:00"); - manager = new InstanceManager("1", 2, basicDb); + Db taskBasicDb = TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK); + TaskProfileDb taskDb = new TaskProfileDb(taskBasicDb); + manager = new InstanceManager("1", 2, basicDb, taskDb); manager.CORE_THREAD_SLEEP_TIME_MS = 100; manager.start(); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java index fc989b3bcf..9985214873 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java @@ -61,8 +61,8 @@ public class FileScanner { long startTime, long endTime, boolean isRetry) { if (!isRetry) { - startTime += NewDateUtils.calcOffset(timeOffset); - endTime += NewDateUtils.calcOffset(timeOffset); + startTime += DateTransUtils.calcOffset(timeOffset); + endTime += DateTransUtils.calcOffset(timeOffset); } String strStartTime = DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit); String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, cycleUnit); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 355bf3909f..86ce040210 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -34,6 +34,7 @@ import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; import org.apache.inlong.agent.plugin.utils.file.PathDateExpression; import org.apache.inlong.agent.state.State; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.agent.utils.file.FileUtils; import org.slf4j.Logger; @@ -70,6 +71,7 @@ public class LogFileCollectTask extends Task { public static final String DEFAULT_FILE_INSTANCE = "org.apache.inlong.agent.plugin.instance.FileInstance"; private static final Logger LOGGER = LoggerFactory.getLogger(LogFileCollectTask.class); + public static final String SCAN_CYCLE_RANCE = "-2"; private TaskProfile taskProfile; private Db basicDb; private TaskManager taskManager; @@ -117,7 +119,7 @@ public class LogFileCollectTask extends Task { isRealTime = true; } instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM), - basicDb); + basicDb, taskManager.getTaskDb()); try { instanceManager.start(); } catch (Exception e) { @@ -318,7 +320,7 @@ public class LogFileCollectTask extends Task { if (!retry) { long currentTime = System.currentTimeMillis(); // only scan two cycle, like two hours or two days - long offset = NewDateUtils.calcOffset("-2" + taskProfile.getCycleUnit()); + long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + taskProfile.getCycleUnit()); startScanTime = currentTime + offset; endScanTime = currentTime; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index 706167788f..c38eb57be8 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -223,54 +223,14 @@ public class NewDateUtils { } if (timeOffset.startsWith("-")) { - timeInterval -= calcOffset(timeOffset); + timeInterval -= DateTransUtils.calcOffset(timeOffset); } else { - timeInterval += calcOffset(timeOffset); + timeInterval += DateTransUtils.calcOffset(timeOffset); } return isValidCreationTime(dataTime, timeInterval); } - /** - * Calculate offset time based on offset - * The current offset will only be offset forward, or it can be offset backward to be compatible with the previous - * calculation method (subtraction). - * When it is offset backward, it returns negative; - * When offset forward, return positive - * - * @param timeOffset offset,such as -1d,-4h,-10m; - * @return - */ - public static long calcOffset(String timeOffset) { - if (timeOffset.length() == 0) { - return 0; - } - String offsetUnit = timeOffset.substring(timeOffset.length() - 1); - int startIndex; - int symbol; - if (timeOffset.charAt(0) == '-') { - symbol = -1; - startIndex = 1; - } else { - symbol = 1; - startIndex = 0; - } - - String strOffset = timeOffset.substring(startIndex, timeOffset.length() - 1); - if (strOffset.length() == 0) { - return 0; - } - int offsetTime = Integer.parseInt(strOffset); - if ("d".equalsIgnoreCase(offsetUnit)) { - return offsetTime * 24 * 3600 * 1000 * symbol; - } else if ("h".equalsIgnoreCase(offsetUnit)) { - return offsetTime * 3600 * 1000 * symbol; - } else if ("m".equalsIgnoreCase(offsetUnit)) { - return offsetTime * 60 * 1000 * symbol; - } - return 0; - } - /* * Check whether the data time is between curTime - interval and curTime + interval. */ diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java index 46860ae20a..ea575e613d 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java @@ -17,7 +17,7 @@ package org.apache.inlong.agent.plugin.utils; -import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.metric.MetricRegister; import org.apache.commons.io.FileUtils; @@ -46,12 +46,12 @@ public class TestUtils { @Test public void testCalcOffset() { - Assert.assertTrue(NewDateUtils.calcOffset("-1h") == -3600 * 1000); - Assert.assertTrue(NewDateUtils.calcOffset("1D") == 24 * 3600 * 1000); - Assert.assertTrue(NewDateUtils.calcOffset("0") == 0); - Assert.assertTrue(NewDateUtils.calcOffset("1") == 0); - Assert.assertTrue(NewDateUtils.calcOffset("10") == 0); - Assert.assertTrue(NewDateUtils.calcOffset("") == 0); + Assert.assertTrue(DateTransUtils.calcOffset("-1h") == -3600 * 1000); + Assert.assertTrue(DateTransUtils.calcOffset("1D") == 24 * 3600 * 1000); + Assert.assertTrue(DateTransUtils.calcOffset("0") == 0); + Assert.assertTrue(DateTransUtils.calcOffset("1") == 0); + Assert.assertTrue(DateTransUtils.calcOffset("10") == 0); + Assert.assertTrue(DateTransUtils.calcOffset("") == 0); } public static String getTestTriggerProfile() { diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java index 6401fa8ffb..583604b32b 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java @@ -25,7 +25,7 @@ public enum TaskStateEnum { NEW(0), RUNNING(1), FROZEN(2), - FINISH(3); + RETRY_FINISH(3); private final int state; @@ -42,7 +42,7 @@ public enum TaskStateEnum { case 2: return FROZEN; case 3: - return FINISH; + return RETRY_FINISH; default: throw new RuntimeException("Unsupported task state " + state); }