This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new a891202393 [INLONG-9214][Agent] Limit max file count to collect once 
(#9216)
a891202393 is described below

commit a8912023936b5742ef6113f975ca0fa1eb6e5a83
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Nov 3 17:40:13 2023 +0800

    [INLONG-9214][Agent] Limit max file count to collect once (#9216)
---
 .../apache/inlong/agent/core/instance/InstanceManager.java | 11 +++++++----
 .../inlong/agent/core/instance/TestInstanceManager.java    |  2 +-
 .../inlong/agent/plugin/task/filecollect/FileScanner.java  | 14 ++++++--------
 .../agent/plugin/task/filecollect/LogFileCollectTask.java  |  5 +++--
 .../org/apache/inlong/agent/plugin/utils/PluginUtils.java  |  3 +--
 5 files changed, 18 insertions(+), 17 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index e209535db8..de96b93bc1 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.InstanceDb;
 import org.apache.inlong.agent.plugin.Instance;
@@ -62,7 +61,7 @@ public class InstanceManager extends AbstractDaemon {
             new SynchronousQueue<>(),
             new AgentThreadFactory("instance-manager"));
 
-    private final int taskMaxLimit;
+    private final int instanceLimit;
     private final AgentConfiguration agentConf;
     private final String taskId;
     private volatile boolean runAtLeastOneTime = false;
@@ -71,12 +70,12 @@ public class InstanceManager extends AbstractDaemon {
     /**
      * Init task manager.
      */
-    public InstanceManager(String taskId, Db basicDb) {
+    public InstanceManager(String taskId, int instanceLimit, Db basicDb) {
         this.taskId = taskId;
         instanceDb = new InstanceDb(basicDb);
         this.agentConf = AgentConfiguration.getAgentConf();
         instanceMap = new ConcurrentHashMap<>();
-        taskMaxLimit = agentConf.getInt(AgentConstants.JOB_NUMBER_LIMIT, 
AgentConstants.DEFAULT_JOB_NUMBER_LIMIT);
+        this.instanceLimit = instanceLimit;
         actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
     }
 
@@ -236,6 +235,10 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void addInstance(InstanceProfile profile) {
+        if (instanceMap.size() >= instanceLimit) {
+            LOGGER.error("instanceMap size {} over limit {}", 
instanceMap.size(), instanceLimit);
+            return;
+        }
         LOGGER.info("add instance taskId {} instanceId {}", taskId, 
profile.getInstanceId());
         if (!shouldAddAgain(profile.getInstanceId(), 
profile.getFileUpdateTime())) {
             LOGGER.info("shouldAddAgain returns false skip taskId {} 
instanceId {}", taskId, profile.getInstanceId());
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index e858743cb3..b1107bb2ab 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -50,7 +50,7 @@ public class TestInstanceManager {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
         Db basicDb = TaskManager.initDb("/localdb");
         taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING);
-        manager = new InstanceManager("1", basicDb);
+        manager = new InstanceManager("1", 2, basicDb);
         manager.start();
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index bfb1a7a80a..efa87e22dc 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.agent.plugin.task.filecollect;
 
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
 import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
 import org.apache.inlong.agent.plugin.utils.file.Files;
@@ -35,6 +34,8 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
+
 /*
  * This class is mainly used for scanning log file that we want to read. We 
use this class at
  * inlong_agent recover process, the do and redo tasks and the current log 
file access when we deploy a
@@ -69,14 +70,12 @@ public class FileScanner {
         logger.info("task {} this scan time is between {} and {}.",
                 new Object[]{conf.getTaskId(), startTime, endTime});
 
-        return scanTaskBetweenTimes(conf, originPattern, startTime, endTime);
+        return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, 
startTime, endTime);
     }
 
     /* Scan log files and create tasks between two times. */
-    public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, 
String originPattern, String startTime,
+    public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, 
String originPattern, String startTime,
             String endTime) {
-        String cycleUnit = conf.getCycleUnit();
-        int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
         List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, 
cycleUnit);
         List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>();
         for (Long time : dateRegion) {
@@ -87,7 +86,7 @@ public class FileScanner {
             String firstDir = allPaths.get(0);
             String secondDir = allPaths.get(0) + File.separator + 
allPaths.get(1);
             ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, 
secondDir, filename, 3,
-                    maxFileNum);
+                    DEFAULT_FILE_MAX_NUM);
             for (String file : fileList) {
                 // TODO the time is not YYYYMMDDHH
                 String dataTime = NewDateUtils.millSecConvertToTimeStr(time, 
cycleUnit);
@@ -100,8 +99,7 @@ public class FileScanner {
         return infos;
     }
 
-    public static ArrayList<String> scanFile(TaskProfile conf, String 
originPattern, long dataTime) {
-        int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
+    public static ArrayList<String> scanFile(int maxFileNum, String 
originPattern, long dataTime) {
         Calendar calendar = Calendar.getInstance();
         calendar.setTimeInMillis(dataTime);
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 54170a5290..1629b89595 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -113,7 +113,8 @@ public class LogFileCollectTask extends Task {
         retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
         originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
                 .collect(Collectors.toSet());
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
basicDb);
+        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
+                basicDb);
         try {
             instanceManager.start();
         } catch (Exception e) {
@@ -409,7 +410,7 @@ public class LogFileCollectTask extends Task {
                 continue;
             }
             if (Files.isDirectory(child)) {
-                LOGGER.warn("The find creation event is triggered by a 
directory: " + child
+                LOGGER.info("The find creation event is triggered by a 
directory: " + child
                         .getFileName());
                 entity.registerRecursively(child);
                 continue;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 74f7f238db..e08313c10b 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -55,7 +55,6 @@ import static 
org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
 import static 
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
 import static 
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
 import static 
org.apache.inlong.agent.constant.TaskConstants.FILE_DIR_FILTER_PATTERNS;
-import static org.apache.inlong.agent.constant.TaskConstants.FILE_MAX_NUM;
 import static org.apache.inlong.agent.constant.TaskConstants.JOB_RETRY_TIME;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_TIME_OFFSET;
 
@@ -97,7 +96,7 @@ public class PluginUtils {
         Set<PathPattern> pathPatterns =
                 PathPattern.buildPathPattern(dirPatterns, 
jobConf.get(TASK_FILE_TIME_OFFSET, null));
         updateRetryTime(jobConf, pathPatterns);
-        int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM);
+        int maxFileNum = DEFAULT_FILE_MAX_NUM;
         LOGGER.info("dir pattern {}, max file num {}", dirPatterns, 
maxFileNum);
         Collection<File> allFiles = new ArrayList<>();
         pathPatterns.forEach(pathPattern -> 
allFiles.addAll(pathPattern.walkSuitableFiles(maxFileNum)));

Reply via email to