This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f45f26bbe2 [INLONG-10542][Agent] Remove the deleted watch directions (#10544) f45f26bbe2 is described below commit f45f26bbe255f1ab525be68f453c350b56e84004 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Jul 1 21:16:50 2024 +0800 [INLONG-10542][Agent] Remove the deleted watch directions (#10544) * [INLONG-10542][Agent] Remove the deleted watch dirs * INLONG-10542][Agent] Remove the deleted watch dirs --- .../inlong/agent/plugin/task/file/LogFileTask.java | 1 + .../inlong/agent/plugin/task/file/WatchEntity.java | 86 ++++++---------------- 2 files changed, 25 insertions(+), 62 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java index 44cff75f2f..fbee956b0f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java @@ -425,6 +425,7 @@ public class LogFileTask extends AbstractTask { return; } try { + entity.removeDeletedWatchDir(); /* Get all creation events until all events are consumed. */ for (int i = 0; i < entity.getTotalPathSize(); i++) { // maybe the watchService is closed ,but we catch this exception! diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java index a62a602784..af6d018a1d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java @@ -22,7 +22,7 @@ import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition; import org.apache.inlong.agent.plugin.utils.file.PathDateExpression; -import org.apache.inlong.agent.utils.DateTransUtils; +import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,6 @@ import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchKey; import java.nio.file.WatchService; import java.util.ArrayList; -import java.util.Calendar; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -46,6 +45,10 @@ import java.util.regex.Pattern; public class WatchEntity { private static final Logger logger = LoggerFactory.getLogger(WatchEntity.class); + // watch 1 dir per hour, clean it every year, + // if 100 bytes per dir, it will occupy 876k at most + public static final int CLEAN_WATCH_DIR_WATER_LVL = 24 * 365; + public static final long CHECK_WATCH_DIR_INTERVAL_MS = 1000 * 60 * 5; private WatchService watchService; private final String basicStaticPath; private final String originPattern; @@ -59,6 +62,7 @@ public class WatchEntity { private final Map<String, WatchKey> pathToKeys = new ConcurrentHashMap<String, WatchKey>(); private final String dirSeparator = System.getProperty("file.separator"); private String cycleUnit; + private long lastCheckTime; public WatchEntity(WatchService watchService, String originPattern, @@ -88,7 +92,7 @@ public class WatchEntity { public String toString() { return "WatchEntity [parentPathName=" + basicStaticPath + ", readFilePattern=" + regexPattern - + ", dateExpression=" + dateExpression + ", totalDirPattern=" + + ", dateExpression=" + dateExpression + ", originPatternWithoutFileName=" + originPatternWithoutFileName + ", containRegexPattern=" + containRegexPattern + ", totalDirRegexPattern=" + patternWithoutFileName + ", keys=" + keys + ", pathToKeys=" + pathToKeys @@ -97,7 +101,8 @@ public class WatchEntity { private boolean isPathContainRegexPattern() { if (originPatternWithoutFileName.contains("YYYY") || originPatternWithoutFileName.contains("MM") - || originPatternWithoutFileName.contains("DD") || originPatternWithoutFileName.contains("hh")) { + || originPatternWithoutFileName.contains("DD") || originPatternWithoutFileName.contains("hh") + || originPatternWithoutFileName.contains("mm")) { return true; } @@ -168,9 +173,7 @@ public class WatchEntity { } else { return; } - logger.info("rootPath len {}", rootPath.toAbsolutePath().toString().length()); - registerRecursively(rootPath.toFile(), rootPath.toAbsolutePath().toString().length() + 1); } @@ -261,73 +264,32 @@ public class WatchEntity { return dateExpression.getPatternPosition(); } - /* - * Remove the watched path which is 3 cycle units earlier than current task data time, this is because JDK7 starts a - * thread for each watch path, which should consume lots of memory. - */ - public void removeUselessWatchDirectories(String curDataTime) - throws Exception { - - logger.info("removeUselessWatchDirectories {}", curDataTime); - - /* Calculate the data time which is 3 cycle units earlier than current task data time. */ - long curDataTimeMillis = DateTransUtils.timeStrConvertToMillSec(curDataTime, cycleUnit); - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(curDataTimeMillis); - if ("D".equalsIgnoreCase(cycleUnit)) { - calendar.add(Calendar.DAY_OF_YEAR, -3); - } else if ("h".equalsIgnoreCase(cycleUnit)) { - calendar.add(Calendar.HOUR_OF_DAY, -3); - } else if ("10m".equalsIgnoreCase(cycleUnit)) { - calendar.add(Calendar.MINUTE, -30); - } - - /* Calculate the 3 cycle units earlier date. */ - String year = String.valueOf(calendar.get(Calendar.YEAR)); - String month = String.valueOf(calendar.get(Calendar.MONTH) + 1); - if (month.length() < 2) { - month = "0" + month; - } - String day = String.valueOf(calendar.get(Calendar.DAY_OF_MONTH)); - if (day.length() < 2) { - day = "0" + day; - } - String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY)); - if (hour.length() < 2) { - hour = "0" + hour; + public void removeDeletedWatchDir() { + long now = AgentUtils.getCurrentTime(); + if (now - lastCheckTime < CHECK_WATCH_DIR_INTERVAL_MS) { + return; } - String minute = String.valueOf(calendar.get(Calendar.MINUTE)); - if (minute.length() < 2) { - minute = "0" + minute; + lastCheckTime = now; + if (pathToKeys.size() < CLEAN_WATCH_DIR_WATER_LVL) { + logger.info("originPattern {} watch dir size {}", originPattern, pathToKeys.size()); + return; + } else { + logger.info("originPattern {} watch dir size {} > {} try to remove the deleted watch dir", originPattern, + pathToKeys.size(), CLEAN_WATCH_DIR_WATER_LVL); } - - /* Replace it with the date and get a specified watch path. */ - String copyDirPattern = new String(originPatternWithoutFileName); - copyDirPattern = copyDirPattern.replace("YYYY", year); - copyDirPattern = copyDirPattern.replace("MM", month); - copyDirPattern = copyDirPattern.replace("DD", day); - copyDirPattern = copyDirPattern.replace("hh", hour); - copyDirPattern = copyDirPattern.replace("mm", minute); - Set<String> keys = pathToKeys.keySet(); Set<String> tmpKeys = new HashSet<>(); tmpKeys.addAll(keys); - String rootDir = Paths.get(basicStaticPath).toAbsolutePath().toString(); for (String path : tmpKeys) { - /* - * Remove the watch path whose path is smaller than the 3 cycle units earlier. - */ - logger.info("[Path]{} {}", path, copyDirPattern); - if (path.compareTo(copyDirPattern) < 0 && !copyDirPattern.contains(path)) { + File folder = new File(path); + if (!folder.isDirectory()) { WatchKey key = pathToKeys.get(path); key.cancel(); - pathToKeys.remove(path); - - logger.info("Watch path: {} is too old for data time: {}, we should remove", path, - curDataTime); + logger.info("path: {} is deleted we should remove the watch", path); } } + logger.info("pathToKeys size {} after remove", pathToKeys.size()); } public void clearPathToKeys() {