This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 119240cdbc [INLONG-9143][Agent] Add log file collect task (#9145)
119240cdbc is described below

commit 119240cdbc806cb0f98058625f460809b9b69ed5
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Oct 30 11:10:55 2023 +0800

    [INLONG-9143][Agent] Add log file collect task (#9145)
    
    * [INLONG-9143][Agent] Add log file collect task
    
    * [INLONG-9143][Agent] Add log file collect task
    
    fix ut failed
    
    * [INLONG-9143][Agent] Add log file collect task
    
    Try to fix check failed
    
    * [INLONG-9143][Agent] Add log file collect task
    
    fix ut error
---
 .../apache/inlong/agent/plugin/task/CronTask.java  |  58 +++
 .../plugin/task/FormatDateLogFileCollectTask.java  |  27 ++
 .../inlong/agent/plugin/task/PathPattern.java      | 181 ++++++++
 .../agent/plugin/task/filecollect/AgentErrMsg.java |  67 +++
 .../agent/plugin/task/filecollect/FileScanner.java | 174 +++++++
 .../task/filecollect/LogFileCollectTask.java       | 501 +++++++++++++++++++++
 .../agent/plugin/task/filecollect/TaskType.java    |  39 ++
 .../agent/plugin/task/filecollect/WatchEntity.java | 358 +++++++++++++++
 .../inlong/agent/plugin/AgentBaseTestsHelper.java  |  50 +-
 .../agent/plugin/task/MockInstanceManager.java     |  24 +
 .../agent/plugin/task/TestLogfileCollectTask.java  | 119 +++++
 .../src/test/resources/test/20230928_1.txt         |   3 +
 12 files changed, 1593 insertions(+), 8 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
new file mode 100644
index 0000000000..0216eb96c6
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.plugin.file.Task;
+
+/**
+ * Generate job by crontab expression.
+ */
+public class CronTask extends Task {
+
+    @Override
+    public void init(Object srcManager, TaskProfile profile, Db basicDb) {
+
+    }
+
+    @Override
+    public void run() {
+
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+
+    @Override
+    public TaskProfile getProfile() {
+        return null;
+    }
+
+    @Override
+    public String getTaskId() {
+        return null;
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java
new file mode 100644
index 0000000000..b8837fc0df
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
+
+/**
+ * Directory trigger with format date.
+ */
+public class FormatDateLogFileCollectTask extends LogFileCollectTask {
+
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java
new file mode 100644
index 0000000000..137b0a98de
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.plugin.filter.DateFormatRegex;
+import org.apache.inlong.agent.utils.PathUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Path pattern for file filter.
+ * It’s identified by watchDir, which matches {@link PathPattern#whiteList}.
+ */
+public class PathPattern {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(PathPattern.class);
+
+    private final String rootDir;
+    private final Set<String> subDirs;
+    // regex for those files should be matched
+    private final Set<DateFormatRegex> whiteList;
+
+    public PathPattern(String rootDir, Set<String> whiteList) {
+        this(rootDir, whiteList, null);
+    }
+
+    public PathPattern(String rootDir, Set<String> whiteList, String offset) {
+        this.rootDir = rootDir;
+        this.subDirs = new HashSet<>();
+        if (offset != null && StringUtils.isNotBlank(offset)) {
+            this.whiteList = whiteList.stream()
+                    .map(whiteRegex -> 
DateFormatRegex.ofRegex(whiteRegex).withOffset(offset))
+                    .collect(Collectors.toSet());
+            updateDateFormatRegex();
+        } else {
+            this.whiteList = whiteList.stream()
+                    .map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex))
+                    .collect(Collectors.toSet());
+        }
+    }
+
+    public static Set<PathPattern> buildPathPattern(Set<String> whiteList,
+            String offset) {
+        Set<String> commonWatchDir = PathUtils.findCommonRootPath(whiteList);
+        return commonWatchDir.stream().map(rootDir -> {
+            Set<String> commonWatchDirWhiteList =
+                    whiteList.stream()
+                            .filter(whiteRegex -> 
whiteRegex.startsWith(rootDir))
+                            .collect(Collectors.toSet());
+            return new PathPattern(rootDir, commonWatchDirWhiteList, offset);
+        }).collect(Collectors.toSet());
+    }
+
+    /**
+     * cleanup local cache, subDirs is only used to filter duplicated 
directories
+     * in one term watch key check.
+     */
+    public void cleanup() {
+        subDirs.clear();
+    }
+
+    /**
+     * Research all children files with {@link PathPattern#rootDir} matched 
whiteList and filtered by blackList.
+     *
+     * @param maxNum
+     * @return
+     */
+    public Collection<File> walkSuitableFiles(int maxNum) {
+        Collection<File> suitableFiles = new ArrayList<>();
+        walkSuitableFiles(suitableFiles, new File(rootDir), maxNum);
+        return suitableFiles;
+    }
+
+    private void walkSuitableFiles(Collection<File> suitableFiles, File file, 
int maxNum) {
+        if (suitableFiles.size() > maxNum) {
+            LOGGER.warn("Suitable files exceed max num {}, just return.", 
maxNum);
+            return;
+        }
+
+        if (suitable(file.getAbsolutePath())) {
+            if (file.isFile()) {
+                suitableFiles.add(file);
+            } else if (file.isDirectory()) {
+                Stream.of(file.listFiles()).forEach(subFile -> 
walkSuitableFiles(suitableFiles, subFile, maxNum));
+            }
+        }
+    }
+
+    /**
+     * Check whether path is suitable for match whiteList and filtered by 
blackList
+     *
+     * @param path pathString
+     * @return true if suit else false.
+     */
+    public boolean suitable(String path) {
+        // remove common root path
+        String briefSubDir = StringUtils.substringAfter(path, rootDir);
+        // if already watched, then stop deep find
+        if (subDirs.contains(briefSubDir)) {
+            LOGGER.info("already watched {}", path);
+            return false;
+        }
+
+        subDirs.add(briefSubDir);
+        File file = new File(path);
+        return whiteList.stream()
+                .filter(whiteRegex -> whiteRegex.match(file))
+                .findAny()
+                .isPresent();
+    }
+
+    /**
+     * when a new file is found, update regex since time may change.
+     */
+    public void updateDateFormatRegex() {
+        whiteList.forEach(DateFormatRegex::setRegexWithCurrentTime);
+    }
+
+    /**
+     * when job is retry job, the time for searching file should be specified.
+     */
+    public void updateDateFormatRegex(String time) {
+        whiteList.forEach(whiteRegex -> whiteRegex.setRegexWithTime(time));
+    }
+
+    @Override
+    public String toString() {
+        return rootDir;
+    }
+
+    @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(rootDir, false);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof PathPattern) {
+            PathPattern entity = (PathPattern) object;
+            return entity.rootDir.equals(this.rootDir);
+        } else {
+            return false;
+        }
+    }
+
+    public String getRootDir() {
+        return rootDir;
+    }
+
+    public String getSuitTime() {
+        // todo: Adapt to datetime in the case of multiple regex
+        return whiteList.stream().findAny().get().getFormattedTime();
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
new file mode 100644
index 0000000000..eff927736d
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+public class AgentErrMsg {
+
+    public static final String CONFIG_SUCCESS = "SUCCESS";
+
+    // 数据源配置异常 */
+    public static final String DATA_SOURCE_CONFIG_ERROR = 
"ERROR-0-TDAgent|10001|ERROR"
+            + "|ERROR_DATA_SOURCE_CONFIG|";
+
+    // 监控文件夹不存在 */
+    public static final String DIRECTORY_NOT_FOUND_ERROR = 
"ERROR-0-TDAgent|11001|WARN"
+            + "|WARN_DIRECTORY_NOT_EXIST|";
+
+    // 监控文件夹时出错 */
+    public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR"
+            + "|ERROR_WATCH_DIR_ERROR|";
+
+    // 要读取的文件异常(不存在,rotate)
+    public static final String FILE_ERROR = 
"ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|";
+
+    // 读取文件异常
+    public static final String FILE_OP_ERROR = 
"ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|";
+
+    // 磁盘满
+    public static final String DISK_FULL = 
"ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|";
+
+    // 内存溢出
+    public static final String OOM_ERROR = 
"ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|";
+
+    // watcher异常
+    public static final String WATCHER_INVALID = 
"ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|";
+
+    // 连不上tdmanager
+    public static final String CONNECT_TDM_ERROR = 
"ERROR-1-TDAgent|30002|ERROR"
+            + "|ERROR_CANNOT_CONNECT_TO_TDM|";
+
+    // 发送数据到tdbus失败
+    public static final String SEND_TO_BUS_ERROR = 
"ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|";
+
+    // 操作bdb异常
+    public static final String BDB_ERROR = 
"ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|";
+
+    // 内部缓存满
+    public static final String MSG_BUFFER_FULL = 
"ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|";
+
+    // 监控到的事件不合法(任务已删除)
+    public static final String FOUND_EVENT_INVALID = 
"ERROR-1-TDAgent|30003|ERROR"
+            + "|FOUND_EVENT_INVALID|";
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
new file mode 100644
index 0000000000..46896d6cdc
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
+import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
+import org.apache.inlong.agent.plugin.utils.file.Files;
+import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/*
+ * This class is mainly used for scanning log file that we want to read. We 
use this class at
+ * tdagent recover process, the do and redo tasks and the current log file 
access when we deploy a
+ * new data source.
+ */
+public class FileScanner {
+
+    public static class BasicFileInfo {
+
+        public String fileName;
+        public String dataTime;
+
+        public BasicFileInfo(String fileName, String dataTime) {
+            this.fileName = fileName;
+            this.dataTime = dataTime;
+        }
+
+    }
+
+    private static final Logger logger = 
LoggerFactory.getLogger(FileScanner.class);
+
+    public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, 
String originPattern, long failTime,
+            long recoverTime, boolean isRetry) {
+        String cycleUnit = conf.getCycleUnit();
+        if (!isRetry) {
+            failTime -= NewDateUtils.caclOffset(conf.getTimeOffset());
+            recoverTime -= NewDateUtils.caclOffset(conf.getTimeOffset());
+        }
+
+        String startTime = NewDateUtils.millSecConvertToTimeStr(failTime, 
cycleUnit);
+        String endTime = NewDateUtils.millSecConvertToTimeStr(recoverTime, 
cycleUnit);
+        logger.info("task {} this scan time is between {} and {}.",
+                new Object[]{conf.getTaskId(), startTime, endTime});
+
+        return scanTaskBetweenTimes(conf, originPattern, startTime, endTime);
+    }
+
+    /* Scan log files and create tasks between two times. */
+    public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, 
String originPattern, String startTime,
+            String endTime) {
+        String cycleUnit = conf.getCycleUnit();
+        int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
+        List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, 
cycleUnit);
+        List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>();
+        for (Long time : dateRegion) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTimeInMillis(time);
+            String filename = NewDateUtils.replaceDateExpression(calendar, 
originPattern);
+            ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
+            String firstDir = allPaths.get(0);
+            String secondDir = allPaths.get(0) + File.separator + 
allPaths.get(1);
+            ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, 
secondDir, filename, 3,
+                    maxFileNum);
+            for (String file : fileList) {
+                // TODO the time is not YYYYMMDDHH
+                String dataTime = NewDateUtils.millSecConvertToTimeStr(time, 
cycleUnit);
+                BasicFileInfo info = new BasicFileInfo(file, dataTime);
+                logger.info("scan new task fileName {} ,dataTime {}", file,
+                        NewDateUtils.millSecConvertToTimeStr(time, cycleUnit));
+                infos.add(info);
+            }
+        }
+        return infos;
+    }
+
+    public static ArrayList<String> scanFile(TaskProfile conf, String 
originPattern, long dataTime) {
+        int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTimeInMillis(dataTime);
+
+        String filename = NewDateUtils.replaceDateExpression(calendar, 
originPattern);
+        ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
+        String firstDir = allPaths.get(0);
+        String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
+        return getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, 
maxFileNum);
+    }
+
+    private static ArrayList<String> getUpdatedOrNewFiles(String firstDir, 
String secondDir,
+            String fileName, long depth, int maxFileNum) {
+
+        // logger.info("getUpdatedOrNewFiles: firstdir: {}, seconddir: {} 
filename: {}",
+        // new Object[]{firstDir, secondDir, fileName});
+
+        ArrayList<String> ret = new ArrayList<String>();
+        ArrayList<File> readyFiles = new ArrayList<File>();
+
+        if (!new File(firstDir).isDirectory()) {
+            return ret;
+        }
+
+        for (File pathname : Files.find(firstDir).yieldFilesAndDirectories()
+                .recursive().withDepth((int) depth).withDirNameRegex(secondDir)
+                .withFileNameRegex(fileName)) {
+            if (readyFiles.size() >= maxFileNum) {
+                break;
+            }
+            readyFiles.add(pathname);
+        }
+        // sort by last-modified time (older -> newer)
+        Collections.sort(readyFiles, new FileTimeComparator());
+        for (File f : readyFiles) {
+            // System.out.println(f.getAbsolutePath());
+            ret.add(f.getAbsolutePath());
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("unused")
+    private static ArrayList<String> getUpdatedOrNewFiles(String logFileName,
+            int maxFileNum) {
+        ArrayList<String> ret = new ArrayList<String>();
+        ArrayList<String> directories = FilePathUtil
+                .getDirectoryLayers(logFileName);
+        String parentDir = directories.get(0) + File.separator
+                + directories.get(1);
+
+        Pattern pattern = Pattern.compile(directories.get(2),
+                Pattern.CASE_INSENSITIVE);
+        for (File file : new File(parentDir).listFiles()) {
+            Matcher matcher = pattern.matcher(file.getName());
+            if (matcher.matches() && ret.size() < maxFileNum) {
+                ret.add(file.getAbsolutePath());
+            }
+        }
+        return ret;
+    }
+
+    public static void main(String[] args) {
+
+        ArrayList<String> fileList = FileScanner.getUpdatedOrNewFiles(
+                "f:\\\\abc", "f:\\\\abc\\\\", "f:\\\\abc\\\\1.txt", 3, 100);
+        // fileList = FileScanner.getUpdatedOrNewFiles("F:\\abc\\1.txt", 100);
+        for (String fileName : fileList) {
+            System.out.println(fileName);
+        }
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
new file mode 100644
index 0000000000..bde4a33361
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskAction;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.plugin.file.Task;
+import 
org.apache.inlong.agent.plugin.task.filecollect.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
+import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.file.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Watch directory, if new valid files are created, create jobs 
correspondingly.
+ */
+public class LogFileCollectTask extends Task {
+
+    public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileCollectTask.class);
+    private TaskProfile taskProfile;
+    private Db basicDb;
+    private TaskManager taskManager;
+    private InstanceManager instanceManager;
+    private final Map<String, WatchEntity> watchers = new 
ConcurrentHashMap<>();
+    private final Set<String> watchFailedDirs = new HashSet<>();
+    private final Map<String/* dataTime */, Map<String/* fileName */, 
InstanceProfile>> eventMap =
+            new ConcurrentHashMap<>();
+    public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+    public static final int CORE_THREAD_SLEEP_TIME = 1000;
+    private boolean retry;
+    private long startTime;
+    private long endTime;
+    private boolean initOK = false;
+    private Set<String> originPatterns;
+    private long lastScanTime = 0;
+    public final long SCAN_INTERVAL = 1 * 60 * 1000;
+    private volatile boolean runAtLeastOneTime = false;
+    private volatile boolean running = false;
+
+    @Override
+    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
+        if (!isProfileValid(taskProfile)) {
+            LOGGER.error("task profile invalid {}", taskProfile);
+            return;
+        }
+        taskManager = (TaskManager) srcManager;
+        commonInit(taskProfile, basicDb);
+        if (retry) {
+            retryInit();
+        } else {
+            watchInit();
+        }
+        initOK = true;
+    }
+
+    private void commonInit(TaskProfile taskProfile, Db basicDb) {
+        this.taskProfile = taskProfile;
+        this.basicDb = basicDb;
+        retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
+        originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
+                .collect(Collectors.toSet());
+        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
basicDb);
+        try {
+            instanceManager.start();
+        } catch (Exception e) {
+            LOGGER.error("start instance manager error {}", e.getMessage());
+        }
+    }
+
+    private boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+        boolean ret =
+                profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
+                        && profile.hasKey(TaskConstants.TASK_FILE_TIME_OFFSET)
+                        && profile.hasKey(TaskConstants.FILE_MAX_NUM);
+        if (!ret) {
+            LOGGER.error("task profile needs file keys");
+            return false;
+        }
+        if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) {
+            long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0);
+            long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0);
+            if (startTime == 0 || endTime == 0) {
+                LOGGER.error("retry task time error start {} end {}", 
startTime, endTime);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void retryInit() {
+        startTime = taskProfile.getLong(TaskConstants.TASK_START_TIME, 0);
+        endTime = taskProfile.getLong(TaskConstants.TASK_END_TIME, 0);
+    }
+
+    private void watchInit() {
+        originPatterns.forEach((pathPattern) -> {
+            addPathPattern(pathPattern);
+        });
+    }
+
+    public void addPathPattern(String originPattern) {
+        ArrayList<String> directories = 
FilePathUtil.getDirectoryLayers(originPattern);
+        String basicStaticPath = directories.get(0);
+        LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern, 
basicStaticPath});
+        /* Remember the failed watcher creations. */
+        if (!new File(basicStaticPath).exists()) {
+            LOGGER.warn(AgentErrMsg.DIRECTORY_NOT_FOUND_ERROR + 
basicStaticPath);
+            watchFailedDirs.add(originPattern);
+            return;
+        }
+        try {
+            /*
+             * When we construct the watch object, we should do some work with 
the data name, replace yyyy to 4 digits
+             * regression, mm to 2 digits regression, also because of 
difference between java regular expression and
+             * linux regular expression, we have to replace * to ., and 
replace . with \\. .
+             */
+            WatchService watchService = 
FileSystems.getDefault().newWatchService();
+            WatchEntity entity = new WatchEntity(watchService, originPattern, 
taskProfile.getCycleUnit(),
+                    taskProfile.getTimeOffset());
+            entity.registerRecursively();
+            watchers.put(originPattern, entity);
+            watchFailedDirs.remove(originPattern);
+        } catch (IOException e) {
+            if (e.toString().contains("Too many open files") || 
e.toString().contains("打开的文件过多")) {
+                LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString());
+            } else {
+                LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e);
+            }
+        }
+    }
+
+    @Override
+    public void destroy() {
+        doChangeState(State.SUCCEEDED);
+        if (instanceManager != null) {
+            instanceManager.stop();
+        }
+        releaseWatchers(watchers);
+    }
+
+    private void releaseWatchers(Map<String, WatchEntity> watchers) {
+        while (running) {
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+        }
+        watchers.forEach((taskId, watcher) -> {
+            try {
+                watcher.getWatchService().close();
+            } catch (IOException e) {
+                LOGGER.error("close watch service failed taskId {}", e, 
taskId);
+            }
+        });
+    }
+
+    @Override
+    public TaskProfile getProfile() {
+        return taskProfile;
+    }
+
+    @Override
+    public String getTaskId() {
+        return taskProfile.getTaskId();
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("directory-task-core-" + getTaskId());
+        running = true;
+        while (!isFinished()) {
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+            if (!initOK) {
+                continue;
+            }
+            if (retry) {
+                runForRetry();
+            } else {
+                runForNormal();
+            }
+        }
+        running = false;
+    }
+
+    private void runForRetry() {
+        if (!runAtLeastOneTime) {
+            scanExistingFile();
+            dealWithEvenMap();
+            runAtLeastOneTime = true;
+        }
+        if (instanceManager.allInstanceFinished()) {
+            LOGGER.info("retry task finished, send action to task manager, 
taskId {}", getTaskId());
+            TaskAction action = new 
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
+            taskManager.submitAction(action);
+            doChangeState(State.SUCCEEDED);
+        }
+    }
+
+    private void runForNormal() {
+        if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
+            scanExistingFile();
+            lastScanTime = AgentUtils.getCurrentTime();
+        }
+        runForWatching();
+        dealWithEvenMap();
+    }
+
+    private void scanExistingFile() {
+        originPatterns.forEach((originPattern) -> {
+            List<BasicFileInfo> fileInfos = 
scanExistingFileByPattern(originPattern);
+            LOGGER.debug("scan {} get file count {}", originPattern, 
fileInfos.size());
+            fileInfos.forEach((fileInfo) -> {
+                addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+            });
+        });
+    }
+
+    private List<BasicFileInfo> scanExistingFileByPattern(String 
originPattern) {
+        long startScanTime = startTime;
+        long endScanTime = endTime;
+        if (!retry) {
+            long currentTime = System.currentTimeMillis();
+            // only scan two cycle, like two hours or two days
+            long offset = NewDateUtils.caclOffset("-2" + 
taskProfile.getCycleUnit());
+            startScanTime = currentTime - offset;
+            endScanTime = currentTime;
+        }
+        return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern, 
startScanTime, endScanTime, retry);
+    }
+
+    private void runForWatching() {
+        /* Deal with those failed watcher creation tasks. */
+        Set<String> tmpWatchFailedDirs = new HashSet<>(watchFailedDirs);
+        for (String originPattern : tmpWatchFailedDirs) {
+            addPathPattern(originPattern);
+        }
+        /*
+         * Visit the watchers to see if it gets any new file creation, if it 
exists and fits the file name pattern, add
+         * it to the task list.
+         */
+        for (Map.Entry<String, WatchEntity> entry : watchers.entrySet()) {
+            dealWithWatchEntity(entry.getKey());
+        }
+    }
+
+    private void dealWithEvenMap() {
+        removeTimeoutEven(eventMap, retry);
+        for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
+            Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue();
+            // 根据event的数据时间、业务的周期、偏移量计算出该event是否需要在当前时间处理
+            String dataTime = entry.getKey();
+            String shouldStartTime =
+                    NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
+            String currentTime = getCurrentTime();
+            LOGGER.info("taskId {}, dataTime {}, currentTime {}, 
shouldStartTime {}",
+                    new Object[]{getTaskId(), dataTime, currentTime, 
shouldStartTime});
+            if (currentTime.compareTo(shouldStartTime) >= 0) {
+                /* These codes will sort the FileCreationEvents by create 
time. */
+                Set<InstanceProfile> sortedEvents = new 
TreeSet<>(sameDataTimeEvents.values());
+                /* Check the file end with event creation time in asc order. */
+                for (InstanceProfile sortEvent : sortedEvents) {
+                    String fileName = sortEvent.getInstanceId();
+                    InstanceProfile profile = sameDataTimeEvents.get(fileName);
+                    InstanceAction action = new InstanceAction(ActionType.ADD, 
profile);
+                    while (!instanceManager.submitAction(action)) {
+                        LOGGER.error("instance manager action queue is full: 
taskId {}", instanceManager.getTaskId());
+                        AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                    }
+                    sameDataTimeEvents.remove(fileName);
+                }
+            }
+        }
+    }
+
+    private void removeTimeoutEven(Map<String, Map<String, InstanceProfile>> 
eventMap, boolean isRetry) {
+        if (isRetry) {
+            return;
+        }
+        for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
+            // 如果event的数据时间在当前时间前(后)2天之内,则有效
+            String dataTime = entry.getKey();
+            if (!NewDateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
+                /* Remove it from memory map. */
+                eventMap.remove(dataTime);
+                LOGGER.warn("remove too old event from event map. dataTime 
{}", dataTime);
+            }
+        }
+    }
+
+    private String getCurrentTime() {
+        SimpleDateFormat dateFormat = new 
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
+        TimeZone timeZone = 
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+        dateFormat.setTimeZone(timeZone);
+        return dateFormat.format(new Date(System.currentTimeMillis()));
+    }
+
+    public synchronized void dealWithWatchEntity(String originPattern) {
+        WatchEntity entity = watchers.get(originPattern);
+        if (entity == null) {
+            LOGGER.error("Can't find the watch entity for originPattern: " + 
originPattern);
+            return;
+        }
+        try {
+            /* Get all creation events until all events are consumed. */
+            for (int i = 0; i < entity.getTotalPathSize(); i++) {
+                // maybe the watchService is closed ,but we catch this 
exception!
+                final WatchKey key = entity.getWatchService().poll();
+                if (key == null) {
+                    return;
+                }
+                dealWithWatchKey(entity, key);
+            }
+        } catch (Exception e) {
+            LOGGER.error("deal with creation event error: ", e);
+        }
+    }
+
+    private void dealWithWatchKey(WatchEntity entity, WatchKey key) throws 
IOException {
+        Path contextPath = entity.getPath(key);
+        LOGGER.info("Find creation events in path: " + 
contextPath.toAbsolutePath());
+        for (WatchEvent<?> watchEvent : key.pollEvents()) {
+            Path child = resolvePathFromEvent(watchEvent, contextPath);
+            if (child == null) {
+                continue;
+            }
+            if (Files.isDirectory(child)) {
+                LOGGER.warn("The find creation event is triggered by a 
directory: " + child
+                        .getFileName());
+                entity.registerRecursively(child);
+                continue;
+            }
+            handleFilePath(child, entity);
+        }
+        resetWatchKey(entity, key, contextPath);
+    }
+
+    private Path resolvePathFromEvent(WatchEvent<?> watchEvent, Path 
contextPath) {
+        final Kind<?> kind = watchEvent.kind();
+        /*
+         * Can't simply continue when it detects that an event maybe ignored.
+         */
+        if (kind == StandardWatchEventKinds.OVERFLOW) {
+            LOGGER.error("An event is unclear and lost");
+            /*
+             * TODO: should we do a full scan to avoid lost events?
+             */
+            return null;
+        }
+        final WatchEvent<Path> watchEventPath = (WatchEvent<Path>) watchEvent;
+        final Path eventPath = watchEventPath.context();
+        /*
+         * Must resolve, otherwise we can't get the file attributes.
+         */
+        return contextPath.resolve(eventPath);
+    }
+
+    private void handleFilePath(Path filePath, WatchEntity entity) {
+        String newFileName = filePath.toFile().getAbsolutePath();
+        LOGGER.info("[New File] {} {}", newFileName, 
entity.getOriginPattern());
+        Matcher matcher = entity.getPattern().matcher(newFileName);
+        if (matcher.matches() || matcher.lookingAt()) {
+            LOGGER.info("[Matched File] {} {}", newFileName, 
entity.getOriginPattern());
+            String dataTime = getDataTimeFromFileName(newFileName, 
entity.getOriginPattern(),
+                    entity.getDateExpression());
+            if (!checkFileNameForTime(newFileName, entity)) {
+                LOGGER.error(AgentErrMsg.FILE_ERROR + "File Timeout {} {}", 
newFileName, dataTime);
+                return;
+            }
+            addToEvenMap(newFileName, dataTime);
+        }
+    }
+
+    private void addToEvenMap(String fileName, String dataTime) {
+        Long lastModifyTime = FileUtils.getFileLastModifyTime(fileName);
+        if (!instanceManager.shouldAddAgain(fileName, lastModifyTime)) {
+            LOGGER.info("file {} has record in db", fileName);
+            return;
+        }
+        Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.computeIfAbsent(dataTime,
+                mapKey -> new ConcurrentHashMap<>());
+        boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
+        if (containsInMemory) {
+            LOGGER.error("should not happen! may be {} has been deleted and 
add again", fileName);
+            return;
+        }
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
+                fileName, dataTime);
+        sameDataTimeEvents.put(fileName, instanceProfile);
+    }
+
+    private boolean checkFileNameForTime(String newFileName, WatchEntity 
entity) {
+        /* Get the data time for this file. */
+        PathDateExpression dateExpression = entity.getDateExpression();
+        if (dateExpression.getLongestDatePattern().length() != 0) {
+            String dataTime = getDataTimeFromFileName(newFileName, 
entity.getOriginPattern(), dateExpression);
+            LOGGER.info("file {} ,fileTime {}", newFileName, dataTime);
+            if (!NewDateUtils.isValidCreationTime(dataTime, 
entity.getCycleUnit(),
+                    entity.getTimeOffset())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private String getDataTimeFromFileName(String fileName, String 
originPattern, PathDateExpression dateExpression) {
+        /*
+         * TODO: what if this file doesn't have any date pattern regex.
+         *
+         * For this case, we can simple think that the next file creation 
means the last task of this conf should finish
+         * reading and start reading this new file.
+         */
+        // 从文件名称中提取数据时间
+        String fileTime = NewDateUtils.getDateTime(fileName, originPattern, 
dateExpression);
+
+        /**
+         * 将文件时间中任意非数字字符替换掉
+         * 如2015-09-16_00替换成2015091600
+         */
+        return fileTime.replaceAll("\\D", "");
+    }
+
+    private void resetWatchKey(WatchEntity entity, WatchKey key, Path 
contextPath) {
+        key.reset();
+        /*
+         * Register a new watch service on the path if the old watcher is 
invalid.
+         */
+        if (!key.isValid()) {
+            LOGGER.warn(AgentErrMsg.WATCHER_INVALID + "Invalid Watcher {}",
+                    contextPath.getFileName());
+            try {
+                WatchService oldService = entity.getWatchService();
+                oldService.close();
+                WatchService watchService = 
FileSystems.getDefault().newWatchService();
+                entity.clearKeys();
+                entity.clearPathToKeys();
+                entity.setWatchService(watchService);
+                entity.registerRecursively();
+            } catch (IOException e) {
+                LOGGER.error("Restart a new watcher runs into error: ", e);
+            }
+        }
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java
new file mode 100644
index 0000000000..9bda67b85a
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+public enum TaskType {
+
+    READER(0),
+    TAILER(1),
+    UPLOADER(2),
+    STATE(3),
+    OTHER(4),
+    DB(5),
+    GAIAReader(6);
+
+    private int type;
+
+    TaskType(int type) {
+        this.type = type;
+    }
+
+    public int getType() {
+        return type;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
new file mode 100644
index 0000000000..ffd74c0100
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.filecollect;
+
+import org.apache.inlong.agent.plugin.utils.file.DateUtils;
+import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
+import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
+import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class WatchEntity {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(WatchEntity.class);
+    private WatchService watchService;
+    private final String basicStaticPath;
+    private final String originPattern;
+    private final String regexPattern;
+    private final Pattern pattern;
+    private final PathDateExpression dateExpression;
+    private final String originPatternWithoutFileName;
+    private final Pattern patternWithoutFileName;
+    private final boolean containRegexPattern;
+    private final Map<WatchKey, Path> keys = new ConcurrentHashMap<WatchKey, 
Path>();
+    private final Map<String, WatchKey> pathToKeys = new 
ConcurrentHashMap<String, WatchKey>();
+    private final String dirSeparator = System.getProperty("file.separator");
+    private String cycleUnit;
+    private String timeOffset;
+
+    public WatchEntity(WatchService watchService,
+            String originPattern,
+            String cycleUnit,
+            String timeOffset) {
+        this.watchService = watchService;
+        this.originPattern = originPattern;
+        ArrayList<String> directoryLayers = 
FilePathUtil.getDirectoryLayers(originPattern);
+        this.basicStaticPath = directoryLayers.get(0);
+        this.regexPattern = 
NewDateUtils.replaceDateExpressionWithRegex(originPattern);
+        pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL | Pattern.MULTILINE);
+        ArrayList<String> directories = 
FilePathUtil.cutDirectory(originPattern);
+        this.originPatternWithoutFileName = directories.get(0);
+        this.patternWithoutFileName = Pattern
+                
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
+                        Pattern.CASE_INSENSITIVE | Pattern.DOTALL | 
Pattern.MULTILINE);
+        /*
+         * Get the longest data regex from the data name, it's used if we want 
to get out the data time from the file
+         * name.
+         */
+        this.dateExpression = 
DateUtils.extractLongestTimeRegexWithPrefixOrSuffix(originPattern);
+        this.containRegexPattern = isPathContainRegexPattern();
+        this.cycleUnit = cycleUnit;
+        this.timeOffset = timeOffset;
+        logger.info("add a new watchEntity {}", this);
+    }
+
+    @Override
+    public String toString() {
+        return "WatchEntity [parentPathName=" + basicStaticPath
+                + ", readFilePattern=" + regexPattern
+                + ", dateExpression=" + dateExpression + ", totalDirPattern="
+                + originPatternWithoutFileName + ", containRegexPattern="
+                + containRegexPattern + ", totalDirRegexPattern="
+                + patternWithoutFileName + ", keys=" + keys + ", pathToKeys=" 
+ pathToKeys
+                + ", watchService=" + watchService + "]";
+    }
+
+    private boolean isPathContainRegexPattern() {
+        if (originPatternWithoutFileName.contains("YYYY") || 
originPatternWithoutFileName.contains("MM")
+                || originPatternWithoutFileName.contains("DD") || 
originPatternWithoutFileName.contains("hh")) {
+            return true;
+        }
+
+        return false;
+    }
+
+    public boolean isContainRegexPattern() {
+        return containRegexPattern;
+    }
+
+    private int calcPathDepth(String rootDir, String dirName) {
+        // rootDir
+        return 0;
+    }
+
+    private void register(Path dir) throws IOException {
+
+        if (dir == null) {
+            return;
+        }
+
+        String dirName = dir.toAbsolutePath().toString();
+        logger.info(dirName);
+        Matcher matcher = patternWithoutFileName.matcher(dirName);
+        String rootDir = 
Paths.get(basicStaticPath).toAbsolutePath().toString();
+        Paths.get(basicStaticPath).toAbsolutePath().getNameCount();
+
+        // must use suffeix match
+        // consider /data/YYYYMMDD/abc/YYYYMMDDhh.*.txt this case
+        if (!pathToKeys.containsKey(dirName) && (matcher.matches() || 
rootDir.equals(dirName))) {
+            WatchKey key = dir.register(watchService, 
StandardWatchEventKinds.ENTRY_CREATE);
+            keys.put(key, dir);
+            pathToKeys.put(dirName, key);
+
+            logger.info("Register a new directory: " + 
dir.toAbsolutePath().toString());
+        }
+    }
+
+    public void registerRecursively() throws IOException {
+        // register root dir
+        Path rootPath = Paths.get(basicStaticPath);
+        String rootDirName = rootPath.toAbsolutePath().toString();
+        if (!pathToKeys.containsKey(rootDirName)) {
+            WatchKey key = rootPath.register(watchService, 
StandardWatchEventKinds.ENTRY_CREATE);
+            keys.put(key, rootPath);
+            pathToKeys.put(rootDirName, key);
+            logger.info("Register a new directory: " + rootDirName);
+        }
+        registerRecursively(rootPath.toFile(), 
rootPath.toAbsolutePath().toString().length() + 1);
+    }
+
+    public void registerRecursively(Path dir) throws IOException {
+        Path rootPath = dir;
+        String rootDirName = rootPath.toAbsolutePath().toString();
+        int beginIndex = rootDirName.lastIndexOf(dirSeparator) + 1;
+        if (beginIndex == 0) {
+            return;
+        }
+        int index = originPatternWithoutFileName.indexOf(dirSeparator, 
beginIndex + 1);
+        Pattern pattern = getPattern(index);
+        logger.info("beginIndex {} ,index {} ,dirPattern {}",
+                new Object[]{beginIndex, index, pattern.pattern()});
+        if (!pathToKeys.containsKey(rootDirName) && match(pattern, 
rootDirName)) {
+            WatchKey key = rootPath.register(watchService, 
StandardWatchEventKinds.ENTRY_CREATE);
+            keys.put(key, rootPath);
+            pathToKeys.put(rootDirName, key);
+            logger.info("Register a new directory: " + rootDirName);
+        } else {
+            return;
+        }
+
+        logger.info("rootPath len {}", 
rootPath.toAbsolutePath().toString().length());
+
+        registerRecursively(rootPath.toFile(), 
rootPath.toAbsolutePath().toString().length() + 1);
+    }
+
+    public void registerRecursively(File dir, int beginIndex) throws 
IOException {
+        File[] files = dir.listFiles();
+        if (files == null) {
+            return;
+        }
+        int index = originPatternWithoutFileName.indexOf(dirSeparator, 
beginIndex);
+        Pattern pattern = getPattern(index);
+        logger.info("beginIndex {} ,index {} ,dirPattern {}",
+                new Object[]{beginIndex, index, pattern.pattern()});
+        for (int i = 0; i < files.length; i++) {
+            if (files[i].isDirectory()) {
+                String dirName = files[i].toString();
+                Path dirPath = Paths.get(dirName);
+                if (!pathToKeys.containsKey(dirName) && match(pattern, 
dirName)) {
+                    try {
+                        WatchKey key = dirPath
+                                .register(watchService, 
StandardWatchEventKinds.ENTRY_CREATE);
+                        keys.put(key, dirPath);
+                        pathToKeys.put(dirName, key);
+                        logger.info("Register a new directory: " + dirName);
+                    } catch (IOException e) {
+                        /**
+                         * 捕获异常,不能注册的子目录就忽略。
+                         */
+                        logger.error("Register directory {} error, skip it. ", 
dirName, e);
+                        continue;
+                    }
+                    registerRecursively(files[i].getAbsoluteFile(),
+                            files[i].getAbsolutePath().length() + 1);
+                }
+            }
+        }
+    }
+
+    private Pattern getPattern(int index) {
+        String dirPattern = "";
+        if (index == -1) {
+            dirPattern = originPatternWithoutFileName;
+        } else {
+            dirPattern = originPatternWithoutFileName.substring(0, index);
+        }
+        Pattern pattern = 
Pattern.compile(NewDateUtils.replaceDateExpressionWithRegex(dirPattern),
+                Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+        return pattern;
+    }
+
+    private boolean match(Pattern pattern, String dirName) {
+        Matcher matcher = pattern.matcher(dirName);
+        return matcher.matches() || matcher.lookingAt();
+    }
+
+    public Path getPath(WatchKey key) {
+        return keys.get(key);
+    }
+
+    public int getTotalPathSize() {
+        return keys.size();
+    }
+
+    public String getWatchPath() {
+        return basicStaticPath;
+    }
+
+    public WatchService getWatchService() {
+        return watchService;
+    }
+
+    public void setWatchService(WatchService watchService) {
+        this.watchService = watchService;
+    }
+
+    public String getRegexPattern() {
+        return regexPattern;
+    }
+
+    public PathDateExpression getDateExpression() {
+        return dateExpression;
+    }
+
+    public String getLongestDatePattern() {
+        return dateExpression.getLongestDatePattern();
+    }
+
+    public NonRegexPatternPosition getPatternPosition() {
+        return dateExpression.getPatternPosition();
+    }
+
+    /*
+     * Remove the watched path which is 3 cycle units earlier than current 
task data time, this is because JDK7 starts a
+     * thread for each watch path, which should consume lots of memory.
+     */
+    public void removeUselessWatchDirectories(String curDataTime)
+            throws Exception {
+
+        logger.info("removeUselessWatchDirectories {}", curDataTime);
+
+        /* Calculate the data time which is 3 cycle units earlier than current 
task data time. */
+        long curDataTimeMillis = 
NewDateUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTimeInMillis(curDataTimeMillis);
+        if ("D".equalsIgnoreCase(cycleUnit)) {
+            calendar.add(Calendar.DAY_OF_YEAR, -3);
+        } else if ("h".equalsIgnoreCase(cycleUnit)) {
+            calendar.add(Calendar.HOUR_OF_DAY, -3);
+        } else if ("10m".equalsIgnoreCase(cycleUnit)) {
+            calendar.add(Calendar.MINUTE, -30);
+        }
+
+        /* Calculate the 3 cycle units earlier date. */
+        String year = String.valueOf(calendar.get(Calendar.YEAR));
+        String month = String.valueOf(calendar.get(Calendar.MONTH) + 1);
+        if (month.length() < 2) {
+            month = "0" + month;
+        }
+        String day = String.valueOf(calendar.get(Calendar.DAY_OF_MONTH));
+        if (day.length() < 2) {
+            day = "0" + day;
+        }
+        String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY));
+        if (hour.length() < 2) {
+            hour = "0" + hour;
+        }
+        String minute = String.valueOf(calendar.get(Calendar.MINUTE));
+        if (minute.length() < 2) {
+            minute = "0" + minute;
+        }
+
+        /* Replace it with the date and get a specified watch path. */
+        String copyDirPattern = new String(originPatternWithoutFileName);
+        copyDirPattern = copyDirPattern.replace("YYYY", year);
+        copyDirPattern = copyDirPattern.replace("MM", month);
+        copyDirPattern = copyDirPattern.replace("DD", day);
+        copyDirPattern = copyDirPattern.replace("hh", hour);
+        copyDirPattern = copyDirPattern.replace("mm", minute);
+
+        Set<String> keys = pathToKeys.keySet();
+        Set<String> tmpKeys = new HashSet<>();
+        tmpKeys.addAll(keys);
+        String rootDir = 
Paths.get(basicStaticPath).toAbsolutePath().toString();
+        for (String path : tmpKeys) {
+            /*
+             * Remove the watch path whose path is smaller than the 3 cycle 
units earlier.
+             */
+            logger.info("[Path]{}  {}", path, copyDirPattern);
+            if (path.compareTo(copyDirPattern) < 0 && 
!copyDirPattern.contains(path)) {
+                WatchKey key = pathToKeys.get(path);
+                key.cancel();
+
+                pathToKeys.remove(path);
+
+                logger.info("Watch path: {} is too old for data time: {}, we 
should remove", path,
+                        curDataTime);
+            }
+        }
+    }
+
+    public void clearPathToKeys() {
+        pathToKeys.clear();
+    }
+
+    public void clearKeys() {
+        keys.clear();
+    }
+
+    public String getCycleUnit() {
+        return cycleUnit;
+    }
+
+    public String getTimeOffset() {
+        return timeOffset;
+    }
+
+    public String getOriginPattern() {
+        return originPattern;
+    }
+
+    public Pattern getPattern() {
+        return pattern;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 610dbc7574..7850678acd 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -18,13 +18,18 @@
 package org.apache.inlong.agent.plugin;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
@@ -34,7 +39,8 @@ import java.nio.file.Paths;
 public class AgentBaseTestsHelper {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentBaseTestsHelper.class);
-
+    private static final GsonBuilder gsonBuilder = new 
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
+    private static final Gson GSON = gsonBuilder.create();
     private final String className;
     private Path testRootDir;
     private Path parentPath;
@@ -45,8 +51,8 @@ public class AgentBaseTestsHelper {
 
     public AgentBaseTestsHelper setupAgentHome() {
         parentPath = Paths.get("./").toAbsolutePath();
-        testRootDir = Paths.get(parentPath + File.separator + "logs",
-                AgentBaseTestsHelper.class.getSimpleName(), className);
+        testRootDir = Paths
+                .get("/tmp", AgentBaseTestsHelper.class.getSimpleName(), 
className);
         teardownAgentHome();
         boolean result = testRootDir.toFile().mkdirs();
         LOGGER.info("try to create {}, result is {}", testRootDir, result);
@@ -54,14 +60,14 @@ public class AgentBaseTestsHelper {
         return this;
     }
 
-    public Path getTestRootDir() {
-        return testRootDir.toAbsolutePath();
-    }
-
     public Path getParentPath() {
         return parentPath;
     }
 
+    public Path getTestRootDir() {
+        return testRootDir;
+    }
+
     public void teardownAgentHome() {
         if (testRootDir != null) {
             try {
@@ -71,4 +77,32 @@ public class AgentBaseTestsHelper {
             }
         }
     }
+
+    public TaskProfile getTaskProfile(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
+            TaskStateEnum state) {
+        DataConfig dataConfig = getDataConfig(taskId, pattern, retry, 
startTime, endTime, state);
+        TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
+        return profile;
+    }
+
+    private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
+            TaskStateEnum state) {
+        DataConfig dataConfig = new DataConfig();
+        dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
+        dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
+        dataConfig.setDataReportType(1); // 老字段 reportType
+        dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
+        dataConfig.setTaskId(taskId); // 老字段 任务 id
+        dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停
+        FileTaskConfig fileTaskConfig = new FileTaskConfig();
+        fileTaskConfig.setPattern(pattern);// 正则
+        fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 
小时前的
+        fileTaskConfig.setMaxFileCount(100); // 最大文件数
+        fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
+        fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
+        fileTaskConfig.setStartTime(startTime);
+        fileTaskConfig.setEndTime(endTime);
+        dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
+        return dataConfig;
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java
new file mode 100644
index 0000000000..a989f3bfaf
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+public class MockInstanceManager {
+
+    public void stop() {
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
new file mode 100644
index 0000000000..8a9ed4c021
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import com.google.gson.Gson;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LogFileCollectTask.class)
+@PowerMockIgnore({"javax.management.*"})
+public class TestLogfileCollectTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestLogfileCollectTask.class);
+    private static final ClassLoader LOADER = 
TestLogfileCollectTask.class.getClassLoader();
+    private static LogFileCollectTask task;
+    private static AgentBaseTestsHelper helper;
+    private static final Gson GSON = new Gson();
+    private static TaskManager manager;
+    private static MockInstanceManager instanceManager = new 
MockInstanceManager();
+    private static String resourceName;
+    private static String fileName;
+    private static String dataTime;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("TestLogfileCollectTask"));
+
+    @BeforeClass
+    public static void setup() {
+        helper = new 
AgentBaseTestsHelper(TestLogfileCollectTask.class.getName()).setupAgentHome();
+        Db basicDb = TaskManager.initDb("/localdb");
+        resourceName = LOADER.getResource("test/20230928_1.txt").getPath();
+        File f = new File(resourceName);
+        String pattern = f.getParent() + "/YYYYMMDD_[0-9]+.txt";
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 
0L, TaskStateEnum.RUNNING);
+        try {
+            String startStr = "2023-09-20 00:00:00";
+            String endStr = "2023-09-30 00:00:00";
+            Date parse = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").parse(startStr);
+            long start = parse.getTime();
+            parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr);
+            long end = parse.getTime();
+            taskProfile.setLong(TaskConstants.TASK_START_TIME, start);
+            taskProfile.setLong(TaskConstants.TASK_END_TIME, end);
+            manager = new TaskManager();
+            task = PowerMockito.spy(new LogFileCollectTask());
+            PowerMockito.doAnswer(invocation -> {
+                fileName = invocation.getArgument(0);
+                dataTime = invocation.getArgument(1);
+                return null;
+            }).when(task, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
+            task.init(manager, taskProfile, basicDb);
+            EXECUTOR_SERVICE.submit(task);
+        } catch (Exception e) {
+            LOGGER.error("source init error {}", e);
+            Assert.assertTrue("source init error", false);
+        }
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        task.destroy();
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testTaskManager() throws Exception {
+        await().atMost(2, TimeUnit.SECONDS).until(() -> fileName != null && 
dataTime != null);
+        Assert.assertTrue(fileName.compareTo(resourceName) == 0);
+        Assert.assertTrue(dataTime.compareTo("20230928") == 0);
+        PowerMockito.verifyPrivate(task, Mockito.times(1))
+                .invoke("addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
+    }
+}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/20230928_1.txt 
b/inlong-agent/agent-plugins/src/test/resources/test/20230928_1.txt
new file mode 100644
index 0000000000..780b09709f
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/test/20230928_1.txt
@@ -0,0 +1,3 @@
+hello line-end-symbol aa
+world line-end-symbol
+agent line-end-symbol

Reply via email to