This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4552466dfe [INLONG-9241][Agent] Print task and instance detail every 
ten seconds (#9243)
4552466dfe is described below

commit 4552466dfefed046a2188571a3213570244c7d9e
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Nov 9 09:57:24 2023 +0800

    [INLONG-9241][Agent] Print task and instance detail every ten seconds 
(#9243)
---
 .../agent/core/instance/InstanceManager.java       | 28 ++++++++++++++++++----
 .../inlong/agent/core/task/file/TaskManager.java   | 17 +++++++++++++
 .../agent/core/instance/TestInstanceManager.java   |  1 +
 3 files changed, 42 insertions(+), 4 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index de96b93bc1..d4a278a974 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -47,7 +47,9 @@ public class InstanceManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceManager.class);
     private static final int ACTION_QUEUE_CAPACITY = 100;
-    public static final int CORE_THREAD_SLEEP_TIME = 100;
+    public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
+    public static final int CORE_THREAD_PRINT_TIME = 10000;
+    private long lastPrintTime = 0;
     // task in db
     private final InstanceDb instanceDb;
     // task in memory
@@ -109,7 +111,8 @@ public class InstanceManager extends AbstractDaemon {
             running = true;
             while (isRunnable()) {
                 try {
-                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
+                    printInstanceDetail();
                     dealWithActionQueue(actionQueue);
                     keepPaceWithDb();
                 } catch (Throwable ex) {
@@ -122,6 +125,22 @@ public class InstanceManager extends AbstractDaemon {
         };
     }
 
+    private void printInstanceDetail() {
+        if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
+            LOGGER.info("instanceManager coreThread running! taskId {} action 
count {}", taskId,
+                    actionQueue.size());
+            List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+            for (int i = 0; i < instances.size(); i++) {
+                InstanceProfile instance = instances.get(i);
+                LOGGER.info(
+                        "instanceManager coreThread instance taskId {} index 
{} total {} instanceId {} state {}",
+                        taskId, i,
+                        instances.size(), instance.getInstanceId(), 
instance.getState());
+            }
+            lastPrintTime = AgentUtils.getCurrentTime();
+        }
+    }
+
     private void keepPaceWithDb() {
         traverseDbTasksToMemory();
         traverseMemoryTasksToDb();
@@ -215,7 +234,7 @@ public class InstanceManager extends AbstractDaemon {
     public void waitForTerminate() {
         super.waitForTerminate();
         while (running) {
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
         }
     }
 
@@ -241,7 +260,8 @@ public class InstanceManager extends AbstractDaemon {
         }
         LOGGER.info("add instance taskId {} instanceId {}", taskId, 
profile.getInstanceId());
         if (!shouldAddAgain(profile.getInstanceId(), 
profile.getFileUpdateTime())) {
-            LOGGER.info("shouldAddAgain returns false skip taskId {} 
instanceId {}", taskId, profile.getInstanceId());
+            LOGGER.info("addInstance shouldAddAgain returns false skip taskId 
{} instanceId {}", taskId,
+                    profile.getInstanceId());
             return;
         }
         addToDb(profile);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 9aa71a96ab..b4cb79a48c 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -56,7 +56,9 @@ public class TaskManager extends AbstractDaemon {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskManager.class);
     public static final int CONFIG_QUEUE_CAPACITY = 1;
     public static final int CORE_THREAD_SLEEP_TIME = 1000;
+    public static final int CORE_THREAD_PRINT_TIME = 10000;
     private static final int ACTION_QUEUE_CAPACITY = 100000;
+    private long lastPrintTime = 0;
     // task basic db
     private final Db taskBasicDb;
     // instance basic db
@@ -143,6 +145,7 @@ public class TaskManager extends AbstractDaemon {
             while (isRunnable()) {
                 try {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                    printTaskDetail();
                     dealWithConfigQueue(configQueue);
                     dealWithActionQueue(actionQueue);
                 } catch (Throwable ex) {
@@ -153,6 +156,20 @@ public class TaskManager extends AbstractDaemon {
         };
     }
 
+    private void printTaskDetail() {
+        if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
+            LOGGER.info("taskManager coreThread running!");
+            List<TaskProfile> tasks = taskDb.getTasks();
+            for (int i = 0; i < tasks.size(); i++) {
+                TaskProfile task = tasks.get(i);
+                LOGGER.info("taskManager coreThread task index {} total {} 
taskId {} state {}",
+                        i, tasks.size(), task.getTaskId(), task.getState());
+            }
+            lastPrintTime = AgentUtils.getCurrentTime();
+        }
+
+    }
+
     private void dealWithConfigQueue(BlockingQueue<List<TaskProfile>> queue) {
         List<TaskProfile> dataConfigs = queue.poll();
         if (dataConfigs == null) {
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index b1107bb2ab..cff9a7c243 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -51,6 +51,7 @@ public class TestInstanceManager {
         Db basicDb = TaskManager.initDb("/localdb");
         taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING);
         manager = new InstanceManager("1", 2, basicDb);
+        manager.CORE_THREAD_SLEEP_TIME_MS = 100;
         manager.start();
     }
 

Reply via email to