This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c86def6bf1 [INLONG-9580][Agent] Add unit testing to taskmanager to test their ability to recover tasks from DB (#9581) c86def6bf1 is described below commit c86def6bf187801b87dd8e2cdd8641034f3cb332 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Jan 17 18:34:08 2024 +0800 [INLONG-9580][Agent] Add unit testing to taskmanager to test their ability to recover tasks from DB (#9581) --- .../inlong/agent/core/task/file/TaskManager.java | 4 +- .../inlong/agent/core/task/TestTaskManager.java | 44 +++++++++++++++++----- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index 684600dbb2..d4853085de 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -418,13 +418,15 @@ public class TaskManager extends AbstractDaemon { } private void restoreFromDb() { + LOGGER.info("restore from db start"); List<TaskProfile> taskProfileList = taskDb.getTasks(); taskProfileList.forEach((profile) -> { if (profile.getState() == TaskStateEnum.RUNNING) { - LOGGER.info("restoreFromDb taskId {}", profile.getTaskId()); + LOGGER.info("restore from db taskId {}", profile.getTaskId()); addToMemory(profile); } }); + LOGGER.info("restore from db end"); } private void stopAllTasks() { diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java index 4fff284a80..08e87086e2 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java @@ -20,6 +20,7 @@ package org.apache.inlong.agent.core.task; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.core.AgentBaseTestsHelper; import org.apache.inlong.agent.core.task.file.TaskManager; +import org.apache.inlong.agent.db.TaskProfileDb; import org.apache.inlong.common.enums.TaskStateEnum; import org.junit.AfterClass; @@ -44,24 +45,37 @@ public class TestTaskManager { @BeforeClass public static void setup() { helper = new AgentBaseTestsHelper(TestTaskManager.class.getName()).setupAgentHome(); - try { - manager = new TaskManager(); - manager.start(); - } catch (Exception e) { - Assert.assertTrue("manager start error", false); - } } @AfterClass public static void teardown() throws Exception { - manager.stop(); - helper.teardownAgentHome(); } @Test public void testTaskManager() { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00"); + try { + manager = new TaskManager(); + TaskProfileDb taskProfileDb = manager.getTaskDb(); + for (int i = 1; i <= 10; i++) { + TaskProfile taskProfile = helper.getTaskProfile(i, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, + "GMT+8:00"); + taskProfile.setTaskClass(MockTask.class.getCanonicalName()); + taskProfileDb.storeTask(taskProfile); + } + manager.start(); + for (int i = 1; i <= 10; i++) { + String taskId = String.valueOf(i); + await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId) != null); + Assert.assertTrue(manager.getTask(taskId) != null); + } + } catch (Exception e) { + LOGGER.error("manager start error: ", e); + Assert.assertTrue("manager start error", false); + } + + TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, + "GMT+8:00"); String taskId1 = taskProfile1.getTaskId(); taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); List<TaskProfile> taskProfiles1 = new ArrayList<>(); @@ -77,13 +91,16 @@ public class TestTaskManager { manager.submitTaskProfiles(taskProfiles1); await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) == null); Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.FROZEN); + + // test restore from froze taskProfile1.setState(TaskStateEnum.RUNNING); manager.submitTaskProfiles(taskProfiles1); await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) != null); Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); // test delete - TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00"); + TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, + "GMT+8:00"); taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); List<TaskProfile> taskProfiles2 = new ArrayList<>(); taskProfiles2.add(taskProfile2); @@ -93,5 +110,12 @@ public class TestTaskManager { String taskId2 = taskProfile2.getTaskId(); await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId2) != null); Assert.assertTrue(manager.getTaskProfile(taskId2).getState() == TaskStateEnum.RUNNING); + + try { + manager.stop(); + helper.teardownAgentHome(); + } catch (Exception e) { + Assert.assertTrue("manager stop error", false); + } } }