This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 73f477aae0 [INLONG-11145][Agent] Optimize the logic of supplementing data (#11146) 73f477aae0 is described below commit 73f477aae0e97d725918e985c4539612626d076a Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Sep 19 17:27:19 2024 +0800 [INLONG-11145][Agent] Optimize the logic of supplementing data (#11146) --- .../agent/core/instance/InstanceManager.java | 23 +++++++++------------- .../inlong/agent/plugin/task/AbstractTask.java | 3 ++- .../inlong/agent/plugin/task/file/LogFileTask.java | 5 ++++- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 333c12a7d2..48aedfd09d 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -79,6 +79,7 @@ public class InstanceManager extends AbstractDaemon { private volatile boolean runAtLeastOneTime = false; private volatile boolean running = false; private final double reserveCoefficient = 0.8; + private long finishedInstanceCount = 0; private class InstancePrintStat { @@ -318,6 +319,8 @@ public class InstanceManager extends AbstractDaemon { private void addInstance(InstanceProfile profile) { if (instanceMap.size() >= instanceLimit) { LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit); + actionQueue.offer(new InstanceAction(ActionType.ADD, profile)); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); return; } LOGGER.info("add instance taskId {} instanceId {}", taskId, profile.getInstanceId()); @@ -337,6 +340,7 @@ public class InstanceManager extends AbstractDaemon { deleteFromMemory(profile.getInstanceId()); LOGGER.info("finished instance state {} taskId {} instanceId {}", profile.getState(), profile.getTaskId(), profile.getInstanceId()); + finishedInstanceCount++; } private void deleteInstance(String instanceId) { @@ -458,23 +462,14 @@ public class InstanceManager extends AbstractDaemon { return (instanceMap.size() + actionQueue.size()) >= instanceLimit * reserveCoefficient; } - public boolean allInstanceFinished() { - if (!runAtLeastOneTime) { - return false; - } - if (!instanceMap.isEmpty()) { - return false; - } - if (!actionQueue.isEmpty()) { - return false; - } + public long getFinishedInstanceCount() { + int count = 0; List<InstanceProfile> instances = instanceStore.getInstances(taskId); for (int i = 0; i < instances.size(); i++) { - InstanceProfile profile = instances.get(i); - if (profile.getState() != InstanceStateEnum.FINISHED) { - return false; + if (instances.get(i).getState() == InstanceStateEnum.FINISHED) { + count++; } } - return true; + return count; } } \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java index 75d87bb235..d9ec53ab0b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java @@ -52,6 +52,7 @@ public abstract class AbstractTask extends Task { protected boolean initOK = false; protected long lastPrintTime = 0; protected long auditVersion; + protected long instanceCount = 0; @Override public void init(Object srcManager, TaskProfile taskProfile, Store basicStore) throws IOException { @@ -152,7 +153,7 @@ public abstract class AbstractTask extends Task { } protected boolean allInstanceFinished() { - return instanceManager.allInstanceFinished(); + return instanceCount == instanceManager.getFinishedInstanceCount(); } protected boolean shouldAddAgain(String fileName, long lastModifyTime) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java index fbee956b0f..4f49cfdd7d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java @@ -241,7 +241,7 @@ public class LogFileTask extends AbstractTask { runAtLeastOneTime = true; } dealWithEventMap(); - if (instanceQueue.isEmpty() && allInstanceFinished()) { + if (allInstanceFinished()) { LOGGER.info("retry task finished, send action to task manager, taskId {}", getTaskId()); TaskAction action = new TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile); taskManager.submitAction(action); @@ -264,6 +264,9 @@ public class LogFileTask extends AbstractTask { LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), originPattern, fileInfos.size()); fileInfos.forEach((fileInfo) -> { addToEvenMap(fileInfo.fileName, fileInfo.dataTime); + if (retry) { + instanceCount++; + } }); }); }