This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ade12d2dd [INLONG-11516][Agent] Accelerate the process exit speed 
(#11517)
8ade12d2dd is described below

commit 8ade12d2dde155283675e78554eda2a53ca23b73
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Nov 21 16:32:06 2024 +0800

    [INLONG-11516][Agent] Accelerate the process exit speed (#11517)
    
    * [INLONG-11516][Agent] Accelerate the process exit speed
    
    * [INLONG-11516][Agent] Add modifications to InstacneManager
---
 .../org/apache/inlong/agent/plugin/Instance.java   |  5 +++++
 .../agent/core/instance/InstanceManager.java       | 21 ++++++++++++++++++---
 .../agent/plugin/instance/CommonInstance.java      | 22 ++++++++++++++++++----
 .../plugin/sinks/filecollect/SenderManager.java    |  6 +++++-
 .../agent/plugin/sources/file/AbstractSource.java  | 14 +++++++-------
 .../inlong/agent/plugin/instance/MockInstance.java |  5 +++++
 .../agent/plugin/sources/TestLogFileSource.java    |  2 +-
 7 files changed, 59 insertions(+), 16 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
index 0d43587f6e..e67d543882 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
@@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper {
      */
     public abstract void destroy();
 
+    /**
+     * notify destroy instance.
+     */
+    public abstract void notifyDestroy();
+
     /**
      * get instance profile
      */
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 06dd20a99e..2a27e792e8 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -147,6 +147,9 @@ public class InstanceManager extends AbstractDaemon {
         if (action == null) {
             return false;
         }
+        if (isFull()) {
+            return false;
+        }
         return actionQueue.offer(action);
     }
 
@@ -163,7 +166,7 @@ public class InstanceManager extends AbstractDaemon {
                 try {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
                     printInstanceState();
-                    dealWithActionQueue(actionQueue);
+                    dealWithActionQueue();
                     keepPaceWithStore();
                     String inlongGroupId = taskFromStore.getInlongGroupId();
                     String inlongStreamId = taskFromStore.getInlongStreamId();
@@ -251,10 +254,10 @@ public class InstanceManager extends AbstractDaemon {
         });
     }
 
-    private void dealWithActionQueue(BlockingQueue<InstanceAction> queue) {
+    private void dealWithActionQueue() {
         while (isRunnable()) {
             try {
-                InstanceAction action = queue.poll();
+                InstanceAction action = actionQueue.poll();
                 if (action == null) {
                     break;
                 }
@@ -375,6 +378,15 @@ public class InstanceManager extends AbstractDaemon {
                 instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
     }
 
+    private void notifyDestroyInstance(String instanceId) {
+        Instance instance = instanceMap.get(instanceId);
+        if (instance == null) {
+            LOGGER.error("try to notify destroy instance but not found: taskId 
{} instanceId {}", taskId, instanceId);
+            return;
+        }
+        instance.notifyDestroy();
+    }
+
     private void addToStore(InstanceProfile profile, boolean addNew) {
         LOGGER.info("add instance to instance store state {} instanceId {}", 
profile.getState(),
                 profile.getInstanceId());
@@ -433,6 +445,9 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void stopAllInstances() {
+        instanceMap.values().forEach((instance) -> {
+            notifyDestroyInstance(instance.getInstanceId());
+        });
         instanceMap.values().forEach((instance) -> {
             deleteFromMemory(instance.getInstanceId());
         });
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index 415b05825a..7267066aee 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -95,15 +95,29 @@ public abstract class CommonInstance extends Instance {
 
     @Override
     public void destroy() {
-        if (!inited) {
-            return;
-        }
-        doChangeState(State.SUCCEEDED);
+        Long start = AgentUtils.getCurrentTime();
+        notifyDestroy();
         while (running) {
             AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
         }
+        LOGGER.info("destroy instance wait run elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
+        start = AgentUtils.getCurrentTime();
         this.source.destroy();
+        LOGGER.info("destroy instance wait source elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
+        start = AgentUtils.getCurrentTime();
         this.sink.destroy();
+        LOGGER.info("destroy instance wait sink elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
+    }
+
+    @Override
+    public void notifyDestroy() {
+        if (!inited) {
+            return;
+        }
+        doChangeState(State.SUCCEEDED);
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index a37a171a37..ec4502a7fb 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -70,6 +70,7 @@ public class SenderManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
     private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
+    public static final int RESEND_QUEUE_WAIT_MS = 10;
     // cache for group and sender list, share the map cross agent lifecycle.
     private DefaultMessageSender sender;
     private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
@@ -172,9 +173,12 @@ public class SenderManager {
     }
 
     private void closeMessageSender() {
+        Long start = AgentUtils.getCurrentTime();
         if (sender != null) {
             sender.close();
         }
+        LOGGER.info("close sender elapse {} ms instance {}", 
AgentUtils.getCurrentTime() - start,
+                profile.getInstanceId());
     }
 
     private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) 
{
@@ -286,7 +290,7 @@ public class SenderManager {
             resendRunning = true;
             while (!shutdown) {
                 try {
-                    AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
+                    AgentSenderCallback callback = 
resendQueue.poll(RESEND_QUEUE_WAIT_MS, TimeUnit.MILLISECONDS);
                     if (callback != null) {
                         SenderMessage message = callback.message;
                         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, 
message.getGroupId(),
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 8929b33d01..803b9235d2 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -80,8 +80,8 @@ public abstract class AbstractSource implements Source {
     protected final Integer BATCH_READ_LINE_COUNT = 10000;
     protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
     protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
-    protected final Integer READ_WAIT_TIMEOUT_MS = 10;
-    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
+    protected final Integer WAIT_TIMEOUT_MS = 10;
+    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100;
     private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
     protected BlockingQueue<SourceData> queue;
 
@@ -172,7 +172,7 @@ public abstract class AbstractSource implements Source {
                     emptyCount = 0;
                 }
                 
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
-                AgentUtils.silenceSleepInSeconds(1);
+                AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
                 continue;
             }
             emptyCount = 0;
@@ -231,7 +231,7 @@ public abstract class AbstractSource implements Source {
                 if (!isRunnable()) {
                     return false;
                 }
-                AgentUtils.silenceSleepInSeconds(1);
+                AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
             }
         }
         return true;
@@ -247,7 +247,7 @@ public abstract class AbstractSource implements Source {
         try {
             boolean offerSuc = false;
             while (isRunnable() && !offerSuc) {
-                offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+                offerSuc = queue.offer(sourceData, WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
             }
             if (!offerSuc) {
                 
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.getData().length);
@@ -338,7 +338,7 @@ public abstract class AbstractSource implements Source {
     private SourceData readFromQueue() {
         SourceData sourceData = null;
         try {
-            sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             LOGGER.warn("poll {} data get interrupted.", instanceId);
         }
@@ -405,7 +405,7 @@ public abstract class AbstractSource implements Source {
         while (queue != null && !queue.isEmpty()) {
             SourceData sourceData = null;
             try {
-                sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+                sourceData = queue.poll(WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 LOGGER.warn("poll {} data get interrupted.", instanceId, e);
             }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
index d3dc67df5c..278a9298f9 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java
@@ -55,6 +55,11 @@ public class MockInstance extends Instance {
         destroyTime = index.getAndAdd(1);
     }
 
+    @Override
+    public void notifyDestroy() {
+
+    }
+
     @Override
     public InstanceProfile getProfile() {
         return profile;
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 6ee892c914..5d6871fecb 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -90,7 +90,7 @@ public class TestLogFileSource {
             Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 
0);
             Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 
2);
             Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
-            Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
+            Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
             if (offset > 0) {
                 OffsetProfile offsetProfile = new 
OffsetProfile(instanceProfile.getTaskId(),
                         instanceProfile.getInstanceId(),

Reply via email to