This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a87e7e1a3d [INLONG-9582][Agent] Add unit testing to instance manager to test their ability to recover tasks from DB (#9583) a87e7e1a3d is described below commit a87e7e1a3da7d75c136bce2cbb5096d8d49e2262 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Jan 18 10:09:26 2024 +0800 [INLONG-9582][Agent] Add unit testing to instance manager to test their ability to recover tasks from DB (#9583) --- .../inlong/agent/core/instance/InstanceManager.java | 6 +++++- .../agent/core/instance/TestInstanceManager.java | 18 ++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 3b74cf4e48..e980354fe2 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -132,6 +132,10 @@ public class InstanceManager extends AbstractDaemon { return taskId; } + public InstanceDb getInstanceDb() { + return instanceDb; + } + public Instance getInstance(String instanceId) { return instanceMap.get(instanceId); } @@ -167,7 +171,7 @@ public class InstanceManager extends AbstractDaemon { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId, AgentUtils.getCurrentTime(), 1, 1); } catch (Throwable ex) { - LOGGER.error("coreThread {}", ex); + LOGGER.error("coreThread error: ", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); } runAtLeastOneTime = true; diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index 34772636ad..8d267e71a1 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -23,6 +23,7 @@ import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.core.AgentBaseTestsHelper; import org.apache.inlong.agent.core.task.file.TaskManager; import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.db.InstanceDb; import org.apache.inlong.agent.db.TaskProfileDb; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.DateTransUtils; @@ -57,9 +58,9 @@ public class TestInstanceManager { taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+6:00"); Db taskBasicDb = TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK); TaskProfileDb taskDb = new TaskProfileDb(taskBasicDb); - manager = new InstanceManager("1", 2, basicDb, taskDb); + taskDb.storeTask(taskProfile); + manager = new InstanceManager("1", 20, basicDb, taskDb); manager.CORE_THREAD_SLEEP_TIME_MS = 100; - manager.start(); } @AfterClass @@ -70,6 +71,19 @@ public class TestInstanceManager { @Test public void testInstanceManager() { + InstanceDb instanceDb = manager.getInstanceDb(); + for (int i = 1; i <= 10; i++) { + InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), + String.valueOf(i), taskProfile.getCycleUnit(), "2023092710", + AgentUtils.getCurrentTime()); + instanceDb.storeInstance(profile); + } + manager.start(); + for (int i = 1; i <= 10; i++) { + String instanceId = String.valueOf(i); + await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstance(instanceId) != null); + Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == InstanceStateEnum.DEFAULT); + } long timeBefore = AgentUtils.getCurrentTime(); InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), helper.getTestRootDir() + "/2023092710_1.txt", taskProfile.getCycleUnit(), "2023092710",