This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7a5a837b766e4f2e361d3cd029f2a1a8b0104856
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Dec 11 15:29:18 2023 +0800

    [INLONG-9454][Agent] Increase exit conditions to prevent dead loops (#9455)
    
    (cherry picked from commit 702b8e6242964a7ee36b7cc9b847ea4a675e9ded)
---
 .../apache/inlong/agent/plugin/instance/FileInstance.java    |  4 ++--
 .../apache/inlong/agent/plugin/sources/LogFileSource.java    | 12 ++++++++----
 .../agent/plugin/task/filecollect/LogFileCollectTask.java    |  2 +-
 3 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index fb6e0442ff..1785b4245f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -118,7 +118,7 @@ public class FileInstance extends Instance {
 
     private void handleReadEnd() {
         InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
-        while (!instanceManager.submitAction(action)) {
+        while (!isFinished() && !instanceManager.submitAction(action)) {
             LOGGER.error("instance manager action queue is full: taskId {}",
                     instanceManager.getTaskId());
             AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
@@ -130,7 +130,7 @@ public class FileInstance extends Instance {
         profile.setState(InstanceStateEnum.DELETE);
         profile.setModifyTime(AgentUtils.getCurrentTime());
         InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
-        while (!instanceManager.submitAction(action)) {
+        while (!isFinished() && !instanceManager.submitAction(action)) {
             LOGGER.error("instance manager action queue is full: taskId {}",
                     instanceManager.getTaskId());
             AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 0cc97afb8c..e4de812a8a 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -495,16 +495,20 @@ public class LogFileSource extends AbstractSource {
     }
 
     private void putIntoQueue(SourceData sourceData) {
+        if (sourceData == null) {
+            return;
+        }
         try {
             boolean offerSuc = false;
-            while (offerSuc != true) {
+            while (isRunnable() && offerSuc != true) {
                 offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
             }
-            LOGGER.debug("Read {} from file {}", sourceData.getData(), 
fileName);
-        } catch (InterruptedException e) {
-            if (sourceData != null) {
+            if (!offerSuc) {
                 
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
             }
+            LOGGER.debug("Read {} from file {}", sourceData.getData(), 
fileName);
+        } catch (InterruptedException e) {
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length());
             LOGGER.error("fetchData offer failed {}", e.getMessage());
         }
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 96e4a5e22e..25f3ef456d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -409,7 +409,7 @@ public class LogFileCollectTask extends Task {
                 if (!isCurrentDataTime && instanceManager.isFull()) {
                     return;
                 }
-                while (!instanceManager.submitAction(action)) {
+                while (!isFinished() && !instanceManager.submitAction(action)) 
{
                     LOGGER.error("instance manager action queue is full: 
taskId {}", instanceManager.getTaskId());
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
                 }

Reply via email to