This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f45f26bbe2 [INLONG-10542][Agent] Remove the deleted watch directions 
(#10544)
f45f26bbe2 is described below

commit f45f26bbe255f1ab525be68f453c350b56e84004
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Jul 1 21:16:50 2024 +0800

    [INLONG-10542][Agent] Remove the deleted watch directions (#10544)
    
    * [INLONG-10542][Agent] Remove the deleted watch dirs
    
    * INLONG-10542][Agent] Remove the deleted watch dirs
---
 .../inlong/agent/plugin/task/file/LogFileTask.java |  1 +
 .../inlong/agent/plugin/task/file/WatchEntity.java | 86 ++++++----------------
 2 files changed, 25 insertions(+), 62 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 44cff75f2f..fbee956b0f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -425,6 +425,7 @@ public class LogFileTask extends AbstractTask {
             return;
         }
         try {
+            entity.removeDeletedWatchDir();
             /* Get all creation events until all events are consumed. */
             for (int i = 0; i < entity.getTotalPathSize(); i++) {
                 // maybe the watchService is closed ,but we catch this 
exception!
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
index a62a602784..af6d018a1d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
 import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
 import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
 import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
-import org.apache.inlong.agent.utils.DateTransUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +35,6 @@ import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -46,6 +45,10 @@ import java.util.regex.Pattern;
 public class WatchEntity {
 
     private static final Logger logger = 
LoggerFactory.getLogger(WatchEntity.class);
+    // watch 1 dir per hour, clean it every year,
+    // if 100 bytes per dir, it will occupy 876k at most
+    public static final int CLEAN_WATCH_DIR_WATER_LVL = 24 * 365;
+    public static final long CHECK_WATCH_DIR_INTERVAL_MS = 1000 * 60 * 5;
     private WatchService watchService;
     private final String basicStaticPath;
     private final String originPattern;
@@ -59,6 +62,7 @@ public class WatchEntity {
     private final Map<String, WatchKey> pathToKeys = new 
ConcurrentHashMap<String, WatchKey>();
     private final String dirSeparator = System.getProperty("file.separator");
     private String cycleUnit;
+    private long lastCheckTime;
 
     public WatchEntity(WatchService watchService,
             String originPattern,
@@ -88,7 +92,7 @@ public class WatchEntity {
     public String toString() {
         return "WatchEntity [parentPathName=" + basicStaticPath
                 + ", readFilePattern=" + regexPattern
-                + ", dateExpression=" + dateExpression + ", totalDirPattern="
+                + ", dateExpression=" + dateExpression + ", 
originPatternWithoutFileName="
                 + originPatternWithoutFileName + ", containRegexPattern="
                 + containRegexPattern + ", totalDirRegexPattern="
                 + patternWithoutFileName + ", keys=" + keys + ", pathToKeys=" 
+ pathToKeys
@@ -97,7 +101,8 @@ public class WatchEntity {
 
     private boolean isPathContainRegexPattern() {
         if (originPatternWithoutFileName.contains("YYYY") || 
originPatternWithoutFileName.contains("MM")
-                || originPatternWithoutFileName.contains("DD") || 
originPatternWithoutFileName.contains("hh")) {
+                || originPatternWithoutFileName.contains("DD") || 
originPatternWithoutFileName.contains("hh")
+                || originPatternWithoutFileName.contains("mm")) {
             return true;
         }
 
@@ -168,9 +173,7 @@ public class WatchEntity {
         } else {
             return;
         }
-
         logger.info("rootPath len {}", 
rootPath.toAbsolutePath().toString().length());
-
         registerRecursively(rootPath.toFile(), 
rootPath.toAbsolutePath().toString().length() + 1);
     }
 
@@ -261,73 +264,32 @@ public class WatchEntity {
         return dateExpression.getPatternPosition();
     }
 
-    /*
-     * Remove the watched path which is 3 cycle units earlier than current 
task data time, this is because JDK7 starts a
-     * thread for each watch path, which should consume lots of memory.
-     */
-    public void removeUselessWatchDirectories(String curDataTime)
-            throws Exception {
-
-        logger.info("removeUselessWatchDirectories {}", curDataTime);
-
-        /* Calculate the data time which is 3 cycle units earlier than current 
task data time. */
-        long curDataTimeMillis = 
DateTransUtils.timeStrConvertToMillSec(curDataTime, cycleUnit);
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTimeInMillis(curDataTimeMillis);
-        if ("D".equalsIgnoreCase(cycleUnit)) {
-            calendar.add(Calendar.DAY_OF_YEAR, -3);
-        } else if ("h".equalsIgnoreCase(cycleUnit)) {
-            calendar.add(Calendar.HOUR_OF_DAY, -3);
-        } else if ("10m".equalsIgnoreCase(cycleUnit)) {
-            calendar.add(Calendar.MINUTE, -30);
-        }
-
-        /* Calculate the 3 cycle units earlier date. */
-        String year = String.valueOf(calendar.get(Calendar.YEAR));
-        String month = String.valueOf(calendar.get(Calendar.MONTH) + 1);
-        if (month.length() < 2) {
-            month = "0" + month;
-        }
-        String day = String.valueOf(calendar.get(Calendar.DAY_OF_MONTH));
-        if (day.length() < 2) {
-            day = "0" + day;
-        }
-        String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY));
-        if (hour.length() < 2) {
-            hour = "0" + hour;
+    public void removeDeletedWatchDir() {
+        long now = AgentUtils.getCurrentTime();
+        if (now - lastCheckTime < CHECK_WATCH_DIR_INTERVAL_MS) {
+            return;
         }
-        String minute = String.valueOf(calendar.get(Calendar.MINUTE));
-        if (minute.length() < 2) {
-            minute = "0" + minute;
+        lastCheckTime = now;
+        if (pathToKeys.size() < CLEAN_WATCH_DIR_WATER_LVL) {
+            logger.info("originPattern {} watch dir size {}", originPattern, 
pathToKeys.size());
+            return;
+        } else {
+            logger.info("originPattern {} watch dir size {} > {} try to remove 
the deleted watch dir", originPattern,
+                    pathToKeys.size(), CLEAN_WATCH_DIR_WATER_LVL);
         }
-
-        /* Replace it with the date and get a specified watch path. */
-        String copyDirPattern = new String(originPatternWithoutFileName);
-        copyDirPattern = copyDirPattern.replace("YYYY", year);
-        copyDirPattern = copyDirPattern.replace("MM", month);
-        copyDirPattern = copyDirPattern.replace("DD", day);
-        copyDirPattern = copyDirPattern.replace("hh", hour);
-        copyDirPattern = copyDirPattern.replace("mm", minute);
-
         Set<String> keys = pathToKeys.keySet();
         Set<String> tmpKeys = new HashSet<>();
         tmpKeys.addAll(keys);
-        String rootDir = 
Paths.get(basicStaticPath).toAbsolutePath().toString();
         for (String path : tmpKeys) {
-            /*
-             * Remove the watch path whose path is smaller than the 3 cycle 
units earlier.
-             */
-            logger.info("[Path]{}  {}", path, copyDirPattern);
-            if (path.compareTo(copyDirPattern) < 0 && 
!copyDirPattern.contains(path)) {
+            File folder = new File(path);
+            if (!folder.isDirectory()) {
                 WatchKey key = pathToKeys.get(path);
                 key.cancel();
-
                 pathToKeys.remove(path);
-
-                logger.info("Watch path: {} is too old for data time: {}, we 
should remove", path,
-                        curDataTime);
+                logger.info("path: {} is deleted we should remove the watch", 
path);
             }
         }
+        logger.info("pathToKeys size {} after remove", pathToKeys.size());
     }
 
     public void clearPathToKeys() {

Reply via email to