This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8ade12d2dd [INLONG-11516][Agent] Accelerate the process exit speed (#11517) 8ade12d2dd is described below commit 8ade12d2dde155283675e78554eda2a53ca23b73 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Nov 21 16:32:06 2024 +0800 [INLONG-11516][Agent] Accelerate the process exit speed (#11517) * [INLONG-11516][Agent] Accelerate the process exit speed * [INLONG-11516][Agent] Add modifications to InstacneManager --- .../org/apache/inlong/agent/plugin/Instance.java | 5 +++++ .../agent/core/instance/InstanceManager.java | 21 ++++++++++++++++++--- .../agent/plugin/instance/CommonInstance.java | 22 ++++++++++++++++++---- .../plugin/sinks/filecollect/SenderManager.java | 6 +++++- .../agent/plugin/sources/file/AbstractSource.java | 14 +++++++------- .../inlong/agent/plugin/instance/MockInstance.java | 5 +++++ .../agent/plugin/sources/TestLogFileSource.java | 2 +- 7 files changed, 59 insertions(+), 16 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java index 0d43587f6e..e67d543882 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java @@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper { */ public abstract void destroy(); + /** + * notify destroy instance. + */ + public abstract void notifyDestroy(); + /** * get instance profile */ diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 06dd20a99e..2a27e792e8 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -147,6 +147,9 @@ public class InstanceManager extends AbstractDaemon { if (action == null) { return false; } + if (isFull()) { + return false; + } return actionQueue.offer(action); } @@ -163,7 +166,7 @@ public class InstanceManager extends AbstractDaemon { try { AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); printInstanceState(); - dealWithActionQueue(actionQueue); + dealWithActionQueue(); keepPaceWithStore(); String inlongGroupId = taskFromStore.getInlongGroupId(); String inlongStreamId = taskFromStore.getInlongStreamId(); @@ -251,10 +254,10 @@ public class InstanceManager extends AbstractDaemon { }); } - private void dealWithActionQueue(BlockingQueue<InstanceAction> queue) { + private void dealWithActionQueue() { while (isRunnable()) { try { - InstanceAction action = queue.poll(); + InstanceAction action = actionQueue.poll(); if (action == null) { break; } @@ -375,6 +378,15 @@ public class InstanceManager extends AbstractDaemon { instance.getProfile().getSinkDataTime(), 1, 1, auditVersion); } + private void notifyDestroyInstance(String instanceId) { + Instance instance = instanceMap.get(instanceId); + if (instance == null) { + LOGGER.error("try to notify destroy instance but not found: taskId {} instanceId {}", taskId, instanceId); + return; + } + instance.notifyDestroy(); + } + private void addToStore(InstanceProfile profile, boolean addNew) { LOGGER.info("add instance to instance store state {} instanceId {}", profile.getState(), profile.getInstanceId()); @@ -433,6 +445,9 @@ public class InstanceManager extends AbstractDaemon { } private void stopAllInstances() { + instanceMap.values().forEach((instance) -> { + notifyDestroyInstance(instance.getInstanceId()); + }); instanceMap.values().forEach((instance) -> { deleteFromMemory(instance.getInstanceId()); }); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index 415b05825a..7267066aee 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -95,15 +95,29 @@ public abstract class CommonInstance extends Instance { @Override public void destroy() { - if (!inited) { - return; - } - doChangeState(State.SUCCEEDED); + Long start = AgentUtils.getCurrentTime(); + notifyDestroy(); while (running) { AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); } + LOGGER.info("destroy instance wait run elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + start = AgentUtils.getCurrentTime(); this.source.destroy(); + LOGGER.info("destroy instance wait source elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + start = AgentUtils.getCurrentTime(); this.sink.destroy(); + LOGGER.info("destroy instance wait sink elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); + } + + @Override + public void notifyDestroy() { + if (!inited) { + return; + } + doChangeState(State.SUCCEEDED); } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index a37a171a37..ec4502a7fb 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -70,6 +70,7 @@ public class SenderManager { private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class); private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance(); + public static final int RESEND_QUEUE_WAIT_MS = 10; // cache for group and sender list, share the map cross agent lifecycle. private DefaultMessageSender sender; private LinkedBlockingQueue<AgentSenderCallback> resendQueue; @@ -172,9 +173,12 @@ public class SenderManager { } private void closeMessageSender() { + Long start = AgentUtils.getCurrentTime(); if (sender != null) { sender.close(); } + LOGGER.info("close sender elapse {} ms instance {}", AgentUtils.getCurrentTime() - start, + profile.getInstanceId()); } private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) { @@ -286,7 +290,7 @@ public class SenderManager { resendRunning = true; while (!shutdown) { try { - AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); + AgentSenderCallback callback = resendQueue.poll(RESEND_QUEUE_WAIT_MS, TimeUnit.MILLISECONDS); if (callback != null) { SenderMessage message = callback.message; AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(), diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 8929b33d01..803b9235d2 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -80,8 +80,8 @@ public abstract class AbstractSource implements Source { protected final Integer BATCH_READ_LINE_COUNT = 10000; protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024; protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; - protected final Integer READ_WAIT_TIMEOUT_MS = 10; - private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60; + protected final Integer WAIT_TIMEOUT_MS = 10; + private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100; private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000; protected BlockingQueue<SourceData> queue; @@ -172,7 +172,7 @@ public abstract class AbstractSource implements Source { emptyCount = 0; } MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - AgentUtils.silenceSleepInSeconds(1); + AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS); continue; } emptyCount = 0; @@ -231,7 +231,7 @@ public abstract class AbstractSource implements Source { if (!isRunnable()) { return false; } - AgentUtils.silenceSleepInSeconds(1); + AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS); } } return true; @@ -247,7 +247,7 @@ public abstract class AbstractSource implements Source { try { boolean offerSuc = false; while (isRunnable() && !offerSuc) { - offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); + offerSuc = queue.offer(sourceData, WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } if (!offerSuc) { MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length); @@ -338,7 +338,7 @@ public abstract class AbstractSource implements Source { private SourceData readFromQueue() { SourceData sourceData = null; try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.warn("poll {} data get interrupted.", instanceId); } @@ -405,7 +405,7 @@ public abstract class AbstractSource implements Source { while (queue != null && !queue.isEmpty()) { SourceData sourceData = null; try { - sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.warn("poll {} data get interrupted.", instanceId, e); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java index d3dc67df5c..278a9298f9 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/MockInstance.java @@ -55,6 +55,11 @@ public class MockInstance extends Instance { destroyTime = index.getAndAdd(1); } + @Override + public void notifyDestroy() { + + } + @Override public InstanceProfile getProfile() { return profile; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 6ee892c914..5d6871fecb 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -90,7 +90,7 @@ public class TestLogFileSource { Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0); Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2); Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3); - Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10); + Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10); if (offset > 0) { OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(), instanceProfile.getInstanceId(),