This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73f477aae0 [INLONG-11145][Agent] Optimize the logic of supplementing 
data (#11146)
73f477aae0 is described below

commit 73f477aae0e97d725918e985c4539612626d076a
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Sep 19 17:27:19 2024 +0800

    [INLONG-11145][Agent] Optimize the logic of supplementing data (#11146)
---
 .../agent/core/instance/InstanceManager.java       | 23 +++++++++-------------
 .../inlong/agent/plugin/task/AbstractTask.java     |  3 ++-
 .../inlong/agent/plugin/task/file/LogFileTask.java |  5 ++++-
 3 files changed, 15 insertions(+), 16 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 333c12a7d2..48aedfd09d 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -79,6 +79,7 @@ public class InstanceManager extends AbstractDaemon {
     private volatile boolean runAtLeastOneTime = false;
     private volatile boolean running = false;
     private final double reserveCoefficient = 0.8;
+    private long finishedInstanceCount = 0;
 
     private class InstancePrintStat {
 
@@ -318,6 +319,8 @@ public class InstanceManager extends AbstractDaemon {
     private void addInstance(InstanceProfile profile) {
         if (instanceMap.size() >= instanceLimit) {
             LOGGER.error("instanceMap size {} over limit {}", 
instanceMap.size(), instanceLimit);
+            actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
             return;
         }
         LOGGER.info("add instance taskId {} instanceId {}", taskId, 
profile.getInstanceId());
@@ -337,6 +340,7 @@ public class InstanceManager extends AbstractDaemon {
         deleteFromMemory(profile.getInstanceId());
         LOGGER.info("finished instance state {} taskId {} instanceId {}", 
profile.getState(),
                 profile.getTaskId(), profile.getInstanceId());
+        finishedInstanceCount++;
     }
 
     private void deleteInstance(String instanceId) {
@@ -458,23 +462,14 @@ public class InstanceManager extends AbstractDaemon {
         return (instanceMap.size() + actionQueue.size()) >= instanceLimit * 
reserveCoefficient;
     }
 
-    public boolean allInstanceFinished() {
-        if (!runAtLeastOneTime) {
-            return false;
-        }
-        if (!instanceMap.isEmpty()) {
-            return false;
-        }
-        if (!actionQueue.isEmpty()) {
-            return false;
-        }
+    public long getFinishedInstanceCount() {
+        int count = 0;
         List<InstanceProfile> instances = instanceStore.getInstances(taskId);
         for (int i = 0; i < instances.size(); i++) {
-            InstanceProfile profile = instances.get(i);
-            if (profile.getState() != InstanceStateEnum.FINISHED) {
-                return false;
+            if (instances.get(i).getState() == InstanceStateEnum.FINISHED) {
+                count++;
             }
         }
-        return true;
+        return count;
     }
 }
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 75d87bb235..d9ec53ab0b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -52,6 +52,7 @@ public abstract class AbstractTask extends Task {
     protected boolean initOK = false;
     protected long lastPrintTime = 0;
     protected long auditVersion;
+    protected long instanceCount = 0;
 
     @Override
     public void init(Object srcManager, TaskProfile taskProfile, Store 
basicStore) throws IOException {
@@ -152,7 +153,7 @@ public abstract class AbstractTask extends Task {
     }
 
     protected boolean allInstanceFinished() {
-        return instanceManager.allInstanceFinished();
+        return instanceCount == instanceManager.getFinishedInstanceCount();
     }
 
     protected boolean shouldAddAgain(String fileName, long lastModifyTime) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index fbee956b0f..4f49cfdd7d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -241,7 +241,7 @@ public class LogFileTask extends AbstractTask {
             runAtLeastOneTime = true;
         }
         dealWithEventMap();
-        if (instanceQueue.isEmpty() && allInstanceFinished()) {
+        if (allInstanceFinished()) {
             LOGGER.info("retry task finished, send action to task manager, 
taskId {}", getTaskId());
             TaskAction action = new 
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
             taskManager.submitAction(action);
@@ -264,6 +264,9 @@ public class LogFileTask extends AbstractTask {
             LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), 
originPattern, fileInfos.size());
             fileInfos.forEach((fileInfo) -> {
                 addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+                if (retry) {
+                    instanceCount++;
+                }
             });
         });
     }

Reply via email to