This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.10 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 7a5a837b766e4f2e361d3cd029f2a1a8b0104856 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Dec 11 15:29:18 2023 +0800 [INLONG-9454][Agent] Increase exit conditions to prevent dead loops (#9455) (cherry picked from commit 702b8e6242964a7ee36b7cc9b847ea4a675e9ded) --- .../apache/inlong/agent/plugin/instance/FileInstance.java | 4 ++-- .../apache/inlong/agent/plugin/sources/LogFileSource.java | 12 ++++++++---- .../agent/plugin/task/filecollect/LogFileCollectTask.java | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index fb6e0442ff..1785b4245f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -118,7 +118,7 @@ public class FileInstance extends Instance { private void handleReadEnd() { InstanceAction action = new InstanceAction(ActionType.FINISH, profile); - while (!instanceManager.submitAction(action)) { + while (!isFinished() && !instanceManager.submitAction(action)) { LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); @@ -130,7 +130,7 @@ public class FileInstance extends Instance { profile.setState(InstanceStateEnum.DELETE); profile.setModifyTime(AgentUtils.getCurrentTime()); InstanceAction action = new InstanceAction(ActionType.DELETE, profile); - while (!instanceManager.submitAction(action)) { + while (!isFinished() && !instanceManager.submitAction(action)) { LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 0cc97afb8c..e4de812a8a 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -495,16 +495,20 @@ public class LogFileSource extends AbstractSource { } private void putIntoQueue(SourceData sourceData) { + if (sourceData == null) { + return; + } try { boolean offerSuc = false; - while (offerSuc != true) { + while (isRunnable() && offerSuc != true) { offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS); } - LOGGER.debug("Read {} from file {}", sourceData.getData(), fileName); - } catch (InterruptedException e) { - if (sourceData != null) { + if (!offerSuc) { MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); } + LOGGER.debug("Read {} from file {}", sourceData.getData(), fileName); + } catch (InterruptedException e) { + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.data.length()); LOGGER.error("fetchData offer failed {}", e.getMessage()); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 96e4a5e22e..25f3ef456d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -409,7 +409,7 @@ public class LogFileCollectTask extends Task { if (!isCurrentDataTime && instanceManager.isFull()) { return; } - while (!instanceManager.submitAction(action)) { + while (!isFinished() && !instanceManager.submitAction(action)) { LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); }