This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a891202393 [INLONG-9214][Agent] Limit max file count to collect once (#9216) a891202393 is described below commit a8912023936b5742ef6113f975ca0fa1eb6e5a83 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Fri Nov 3 17:40:13 2023 +0800 [INLONG-9214][Agent] Limit max file count to collect once (#9216) --- .../apache/inlong/agent/core/instance/InstanceManager.java | 11 +++++++---- .../inlong/agent/core/instance/TestInstanceManager.java | 2 +- .../inlong/agent/plugin/task/filecollect/FileScanner.java | 14 ++++++-------- .../agent/plugin/task/filecollect/LogFileCollectTask.java | 5 +++-- .../org/apache/inlong/agent/plugin/utils/PluginUtils.java | 3 +-- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index e209535db8..de96b93bc1 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.InstanceDb; import org.apache.inlong.agent.plugin.Instance; @@ -62,7 +61,7 @@ public class InstanceManager extends AbstractDaemon { new SynchronousQueue<>(), new AgentThreadFactory("instance-manager")); - private final int taskMaxLimit; + private final int instanceLimit; private final AgentConfiguration agentConf; private final String taskId; private volatile boolean runAtLeastOneTime = false; @@ -71,12 +70,12 @@ public class InstanceManager extends AbstractDaemon { /** * Init task manager. */ - public InstanceManager(String taskId, Db basicDb) { + public InstanceManager(String taskId, int instanceLimit, Db basicDb) { this.taskId = taskId; instanceDb = new InstanceDb(basicDb); this.agentConf = AgentConfiguration.getAgentConf(); instanceMap = new ConcurrentHashMap<>(); - taskMaxLimit = agentConf.getInt(AgentConstants.JOB_NUMBER_LIMIT, AgentConstants.DEFAULT_JOB_NUMBER_LIMIT); + this.instanceLimit = instanceLimit; actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); } @@ -236,6 +235,10 @@ public class InstanceManager extends AbstractDaemon { } private void addInstance(InstanceProfile profile) { + if (instanceMap.size() >= instanceLimit) { + LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit); + return; + } LOGGER.info("add instance taskId {} instanceId {}", taskId, profile.getInstanceId()); if (!shouldAddAgain(profile.getInstanceId(), profile.getFileUpdateTime())) { LOGGER.info("shouldAddAgain returns false skip taskId {} instanceId {}", taskId, profile.getInstanceId()); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index e858743cb3..b1107bb2ab 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -50,7 +50,7 @@ public class TestInstanceManager { String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt"; Db basicDb = TaskManager.initDb("/localdb"); taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); - manager = new InstanceManager("1", basicDb); + manager = new InstanceManager("1", 2, basicDb); manager.start(); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java index bfb1a7a80a..efa87e22dc 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java @@ -18,7 +18,6 @@ package org.apache.inlong.agent.plugin.task.filecollect; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator; import org.apache.inlong.agent.plugin.utils.file.Files; @@ -35,6 +34,8 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM; + /* * This class is mainly used for scanning log file that we want to read. We use this class at * inlong_agent recover process, the do and redo tasks and the current log file access when we deploy a @@ -69,14 +70,12 @@ public class FileScanner { logger.info("task {} this scan time is between {} and {}.", new Object[]{conf.getTaskId(), startTime, endTime}); - return scanTaskBetweenTimes(conf, originPattern, startTime, endTime); + return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, startTime, endTime); } /* Scan log files and create tasks between two times. */ - public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, String originPattern, String startTime, + public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String originPattern, String startTime, String endTime) { - String cycleUnit = conf.getCycleUnit(); - int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM); List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, cycleUnit); List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>(); for (Long time : dateRegion) { @@ -87,7 +86,7 @@ public class FileScanner { String firstDir = allPaths.get(0); String secondDir = allPaths.get(0) + File.separator + allPaths.get(1); ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, - maxFileNum); + DEFAULT_FILE_MAX_NUM); for (String file : fileList) { // TODO the time is not YYYYMMDDHH String dataTime = NewDateUtils.millSecConvertToTimeStr(time, cycleUnit); @@ -100,8 +99,7 @@ public class FileScanner { return infos; } - public static ArrayList<String> scanFile(TaskProfile conf, String originPattern, long dataTime) { - int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM); + public static ArrayList<String> scanFile(int maxFileNum, String originPattern, long dataTime) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(dataTime); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 54170a5290..1629b89595 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -113,7 +113,8 @@ public class LogFileCollectTask extends Task { retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false); originPatterns = Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(",")) .collect(Collectors.toSet()); - instanceManager = new InstanceManager(taskProfile.getTaskId(), basicDb); + instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM), + basicDb); try { instanceManager.start(); } catch (Exception e) { @@ -409,7 +410,7 @@ public class LogFileCollectTask extends Task { continue; } if (Files.isDirectory(child)) { - LOGGER.warn("The find creation event is triggered by a directory: " + child + LOGGER.info("The find creation event is triggered by a directory: " + child .getFileName()); entity.registerRecursively(child); continue; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java index 74f7f238db..e08313c10b 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java @@ -55,7 +55,6 @@ import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS; import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST; import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT; import static org.apache.inlong.agent.constant.TaskConstants.FILE_DIR_FILTER_PATTERNS; -import static org.apache.inlong.agent.constant.TaskConstants.FILE_MAX_NUM; import static org.apache.inlong.agent.constant.TaskConstants.JOB_RETRY_TIME; import static org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_TIME_OFFSET; @@ -97,7 +96,7 @@ public class PluginUtils { Set<PathPattern> pathPatterns = PathPattern.buildPathPattern(dirPatterns, jobConf.get(TASK_FILE_TIME_OFFSET, null)); updateRetryTime(jobConf, pathPatterns); - int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM); + int maxFileNum = DEFAULT_FILE_MAX_NUM; LOGGER.info("dir pattern {}, max file num {}", dirPatterns, maxFileNum); Collection<File> allFiles = new ArrayList<>(); pathPatterns.forEach(pathPattern -> allFiles.addAll(pathPattern.walkSuitableFiles(maxFileNum)));