This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 81fb821718 [INLONG-11506][Agent] Task start and end time using string 
type (#11507)
81fb821718 is described below

commit 81fb8217183b65c365cdaaf16b36a631c163286b
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Tue Nov 19 18:32:43 2024 +0800

    [INLONG-11506][Agent] Task start and end time using string type (#11507)
---
 .../apache/inlong/agent/conf/InstanceProfile.java  |   4 +-
 .../org/apache/inlong/agent/conf/TaskProfile.java  |   4 +-
 .../inlong/agent/constant/TaskConstants.java       |   6 +-
 .../org/apache/inlong/agent/pojo/FileTask.java     |   8 +-
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |   4 +-
 .../inlong/agent/core/AgentBaseTestsHelper.java    |   8 +-
 .../agent/plugin/fetcher/ManagerFetcher.java       |  27 +--
 .../inlong/agent/plugin/instance/FileInstance.java |   2 +-
 .../inlong/agent/plugin/sources/LogFileSource.java |   2 +-
 .../inlong/agent/plugin/task/file/AgentErrMsg.java |  67 -------
 .../plugin/{utils => task}/file/FileDataUtils.java |   2 +-
 .../inlong/agent/plugin/task/file/FileScanner.java |  87 ++-------
 .../{utils => task}/file/FileTimeComparator.java   |   2 +-
 .../agent/plugin/{utils => task}/file/Files.java   |   2 +-
 .../inlong/agent/plugin/task/file/LogFileTask.java | 107 ++++++-----
 .../inlong/agent/plugin/task/file/WatchEntity.java |  15 +-
 .../plugin/utils/{file => regex}/DateUtils.java    | 195 +--------------------
 .../plugin/utils/{file => regex}/MatchPoint.java   |   2 +-
 .../plugin/utils/{file => regex}/NewDateUtils.java |  41 +----
 .../{file => regex}/NonRegexPatternPosition.java   |   2 +-
 .../utils/{file => regex}/PathDateExpression.java  |   2 +-
 .../FilePathUtil.java => regex/PatternUtil.java}   |  17 +-
 .../inlong/agent/plugin/utils/regex/Scanner.java   |  97 ++++++++++
 .../inlong/agent/plugin/AgentBaseTestsHelper.java  |  12 +-
 .../agent/plugin/instance/TestInstanceManager.java |   2 +-
 .../inlong/agent/plugin/sinks/KafkaSinkTest.java   |   2 +-
 .../inlong/agent/plugin/sinks/PulsarSinkTest.java  |   2 +-
 .../sinks/filecollect/TestSenderManager.java       |   4 +-
 .../agent/plugin/sources/TestLogFileSource.java    |   4 +-
 .../agent/plugin/sources/TestRedisSource.java      |   2 +-
 .../agent/plugin/sources/TestSQLServerSource.java  |   2 +-
 .../inlong/agent/plugin/task/TestLogFileTask.java  |  28 ++-
 .../inlong/agent/plugin/task/TestTaskManager.java  |   6 +-
 .../inlong/agent/plugin/utils/TestUtils.java       |  29 ++-
 34 files changed, 266 insertions(+), 530 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 9e85872ff5..c9a3d6a022 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -36,10 +36,10 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
 import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
 
 /**
  * job profile which contains details describing properties of one job.
@@ -200,6 +200,6 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
     }
 
     public boolean isRetry() {
-        return getBoolean(TASK_RETRY, false);
+        return getBoolean(FILE_TASK_RETRY, false);
     }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 1f77433c9f..32450735e4 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -36,7 +36,7 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
+import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
 
 /**
@@ -82,7 +82,7 @@ public class TaskProfile extends AbstractConfiguration {
     }
 
     public boolean isRetry() {
-        return getBoolean(TASK_RETRY, false);
+        return getBoolean(FILE_TASK_RETRY, false);
     }
 
     public String getTaskClass() {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 5dbc353d25..22fb87e6e5 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -62,9 +62,9 @@ public class TaskConstants extends CommonConstants {
     public static final String SOURCE_DATA_CONTENT_STYLE = 
"task.fileTask.dataContentStyle";
     public static final String SOURCE_DATA_SEPARATOR = 
"task.fileTask.dataSeparator";
     public static final String SOURCE_FILTER_STREAMS = 
"task.fileTask.filterStreams";
-    public static final String TASK_RETRY = "task.fileTask.retry";
-    public static final String TASK_START_TIME = "task.fileTask.startTime";
-    public static final String TASK_END_TIME = "task.fileTask.endTime";
+    public static final String FILE_TASK_RETRY = "task.fileTask.retry";
+    public static final String FILE_TASK_TIME_FROM = 
"task.fileTask.dataTimeFrom";
+    public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
     public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
     public static final String PREDEFINE_FIELDS = "task.predefinedFields";
     public static final String TASK_AUDIT_VERSION = "task.auditVersion";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
index 57c294f7d4..54c191ffca 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -29,8 +29,8 @@ public class FileTask {
     private Integer id;
     private String cycleUnit;
     private Boolean retry;
-    private Long startTime;
-    private Long endTime;
+    private String dataTimeFrom;
+    private String dataTimeTo;
     private String timeOffset;
     private String timeZone;
     private String addictiveString;
@@ -91,9 +91,9 @@ public class FileTask {
 
         private Boolean retry;
 
-        private Long startTime;
+        private String dataTimeFrom;
 
-        private Long endTime;
+        private String dataTimeTo;
 
         private String pattern;
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 85c636c885..2d4a6a32ae 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -166,8 +166,8 @@ public class TaskProfileDto {
         fileTask.setMaxFileCount(taskConfig.getMaxFileCount());
         fileTask.setRetry(taskConfig.getRetry());
         fileTask.setCycleUnit(taskConfig.getCycleUnit());
-        fileTask.setStartTime(taskConfig.getStartTime());
-        fileTask.setEndTime(taskConfig.getEndTime());
+        fileTask.setDataTimeFrom(taskConfig.getDataTimeFrom());
+        fileTask.setDataTimeTo(taskConfig.getDataTimeTo());
         if (taskConfig.getFilterStreams() != null) {
             
fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
         }
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index fa37fa2f1c..a8dfbdf91a 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -75,14 +75,14 @@ public class AgentBaseTestsHelper {
         }
     }
 
-    public TaskProfile getTaskProfile(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
+    public TaskProfile getTaskProfile(int taskId, String pattern, boolean 
retry, String startTime, String endTime,
             TaskStateEnum state, String timeZone) {
         DataConfig dataConfig = getDataConfig(taskId, pattern, retry, 
startTime, endTime, state, timeZone);
         TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
         return profile;
     }
 
-    private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
+    private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, String startTime, String endTime,
             TaskStateEnum state, String timeZone) {
         DataConfig dataConfig = new DataConfig();
         dataConfig.setInlongGroupId("testGroupId");
@@ -98,8 +98,8 @@ public class AgentBaseTestsHelper {
         fileTaskConfig.setMaxFileCount(100);
         fileTaskConfig.setCycleUnit("h");
         fileTaskConfig.setRetry(retry);
-        fileTaskConfig.setStartTime(startTime);
-        fileTaskConfig.setEndTime(endTime);
+        fileTaskConfig.setDataTimeFrom(startTime);
+        fileTaskConfig.setDataTimeTo(endTime);
         dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
         return dataConfig;
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index e0125751c3..01d7128f8b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -42,10 +42,7 @@ import com.google.gson.JsonObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
@@ -225,26 +222,16 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     private TaskResult getTestConfig(String testDir, int normalTaskId, int 
retryTaskId, int state) {
         List<DataConfig> configs = new ArrayList<>();
-        String startStr = "2023-07-10 00:00:00";
-        String endStr = "2023-07-22 00:00:00";
-        Long start = 0L;
-        Long end = 0L;
         String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+";
         String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+";
-        try {
-            Date parse = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").parse(startStr);
-            start = parse.getTime();
-            parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr);
-            end = parse.getTime();
-        } catch (ParseException e) {
-            e.printStackTrace();
-        }
-        configs.add(getTestDataConfig(normalTaskId, normalPattern, false, 
start, end, CycleUnitType.MINUTE, state));
-        configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, 
end, CycleUnitType.DAY, state));
+        configs.add(getTestDataConfig(normalTaskId, normalPattern, false, 
"202307100000", "202307220000",
+                CycleUnitType.MINUTE, state));
+        configs.add(
+                getTestDataConfig(retryTaskId, retryPattern, true, "20230710", 
"20230722", CycleUnitType.DAY, state));
         return TaskResult.builder().dataConfigs(configs).build();
     }
 
-    private DataConfig getTestDataConfig(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
+    private DataConfig getTestDataConfig(int taskId, String pattern, boolean 
retry, String startTime, String endTime,
             String cycleUnit, int state) {
         DataConfig dataConfig = new DataConfig();
         dataConfig.setInlongGroupId("devcloud_group_id");
@@ -260,8 +247,8 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         fileTaskConfig.setMaxFileCount(100);
         fileTaskConfig.setCycleUnit(cycleUnit);
         fileTaskConfig.setRetry(retry);
-        fileTaskConfig.setStartTime(startTime);
-        fileTaskConfig.setEndTime(endTime);
+        fileTaskConfig.setDataTimeFrom(startTime);
+        fileTaskConfig.setDataTimeTo(endTime);
         fileTaskConfig.setDataContentStyle("CSV");
         fileTaskConfig.setDataSeparator("|");
         dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 3c86a4c33f..38cb969376 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.plugin.instance;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
 
 import java.io.IOException;
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 9ce20f6daa..5aebbc7d86 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -28,7 +28,7 @@ import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 import 
org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java
deleted file mode 100644
index aa7e5c734f..0000000000
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.task.file;
-
-public class AgentErrMsg {
-
-    public static final String CONFIG_SUCCESS = "SUCCESS";
-
-    // data source config error */
-    public static final String DATA_SOURCE_CONFIG_ERROR = 
"ERROR-0-INLONG_AGENT|10001|ERROR"
-            + "|ERROR_DATA_SOURCE_CONFIG|";
-
-    // directory not found error */
-    public static final String DIRECTORY_NOT_FOUND_ERROR = 
"ERROR-0-INLONG_AGENT|11001|WARN"
-            + "|WARN_DIRECTORY_NOT_EXIST|";
-
-    // watch directory error */
-    public static final String WATCH_DIR_ERROR = 
"ERROR-0-INLONG_AGENT|11002|ERROR"
-            + "|ERROR_WATCH_DIR_ERROR|";
-
-    // file error(not found,rotate)
-    public static final String FILE_ERROR = 
"ERROR-0-INLONG_AGENT|10002|ERROR|ERROR_SOURCE_FILE|";
-
-    // read file error
-    public static final String FILE_OP_ERROR = 
"ERROR-1-INLONG_AGENT|30002|ERROR|ERROR_OPERATE_FILE|";
-
-    // disk full
-    public static final String DISK_FULL = 
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_DISK_FULL|";
-
-    // out of memory
-    public static final String OOM_ERROR = 
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_OOM_ERROR|";
-
-    // watcher error
-    public static final String WATCHER_INVALID = 
"ERROR-1-INLONG_AGENT|40001|WARN|WARN_INVALID_WATCHER|";
-
-    // could not connect to manager
-    public static final String CONNECT_MANAGER_ERROR = 
"ERROR-1-INLONG_AGENT|30002|ERROR"
-            + "|ERROR_CANNOT_CONNECT_TO_MANAGER|";
-
-    // send data to dataProxy failed
-    public static final String SEND_TO_BUS_ERROR = 
"ERROR-1-INLONG_AGENT|30003|ERROR|ERROR_SEND_TO_BUS|";
-
-    // operate bdb error
-    public static final String BDB_ERROR = 
"ERROR-1-INLONG_AGENT|30003|ERROR|BDB_OPERATION_ERROR|";
-
-    // buffer full
-    public static final String MSG_BUFFER_FULL = 
"ERROR-1-INLONG_AGENT|40002|WARN|WARN_MSG_BUFFER_FULL|";
-
-    // found event invalid(task has been delete)
-    public static final String FOUND_EVENT_INVALID = 
"ERROR-1-INLONG_AGENT|30003|ERROR"
-            + "|FOUND_EVENT_INVALID|";
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
similarity index 96%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
index 57b4702848..d65960ab17 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.task.file;
 
 import java.io.IOException;
 import java.nio.file.Files;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
index e37b6deb89..7fa759a9d2 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
@@ -17,10 +17,9 @@
 
 package org.apache.inlong.agent.plugin.task.file;
 
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
-import org.apache.inlong.agent.plugin.utils.file.Files;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo;
 import org.apache.inlong.agent.utils.DateTransUtils;
 
 import org.slf4j.Logger;
@@ -28,11 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Collections;
 import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
 
@@ -52,57 +48,24 @@ public class FileScanner {
             this.fileName = fileName;
             this.dataTime = dataTime;
         }
-
     }
 
     private static final Logger logger = 
LoggerFactory.getLogger(FileScanner.class);
 
-    public static List<String> getDataTimeList(long startTime, long endTime, 
String cycleUnit, String timeOffset,
-            boolean isRetry) {
-        if (!isRetry) {
-            startTime += DateTransUtils.calcOffset(timeOffset);
-            endTime += DateTransUtils.calcOffset(timeOffset);
-        }
-        List<String> dataTimeList = new ArrayList<>();
-        List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, 
cycleUnit);
-        for (Long time : dateRegion) {
-            String dataTime = DateTransUtils.millSecConvertToTimeStr(time, 
cycleUnit);
-            dataTimeList.add(dataTime);
-        }
-        return dataTimeList;
-    }
-
     public static List<BasicFileInfo> scanTaskBetweenTimes(String 
originPattern, String cycleUnit, String timeOffset,
             long startTime, long endTime, boolean isRetry) {
-        if (!isRetry) {
-            startTime += DateTransUtils.calcOffset(timeOffset);
-            endTime += DateTransUtils.calcOffset(timeOffset);
-        }
-        String strStartTime = 
DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
-        String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, 
cycleUnit);
-        logger.info("{} scan time is between {} and {}",
-                new Object[]{originPattern, strStartTime, strEndTime});
-
-        return scanTaskBetweenTimes(cycleUnit, originPattern, startTime, 
endTime);
-    }
-
-    /* Scan log files and create tasks between two times. */
-    public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, 
String originPattern, long startTime,
-            long endTime) {
-        List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, 
cycleUnit);
-        List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>();
-        for (Long time : dateRegion) {
-            Calendar calendar = Calendar.getInstance();
-            calendar.setTimeInMillis(time);
-            String fileName = NewDateUtils.replaceDateExpression(calendar, 
originPattern);
-            ArrayList<String> allPaths = 
FilePathUtil.cutDirectoryByWildcard(fileName);
+        List<BasicFileInfo> infos = new ArrayList<>();
+        List<FinalPatternInfo> finalPatternInfos = 
Scanner.getFinalPatternInfos(originPattern, cycleUnit, timeOffset,
+                startTime, endTime, isRetry);
+        for (FinalPatternInfo finalPatternInfo : finalPatternInfos) {
+            ArrayList<String> allPaths = 
PatternUtil.cutDirectoryByWildcard(finalPatternInfo.finalPattern);
             String firstDir = allPaths.get(0);
             String secondDir = allPaths.get(0) + File.separator + 
allPaths.get(1);
-            ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, 
secondDir, fileName, 3,
+            ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, 
secondDir, finalPatternInfo.finalPattern, 3,
                     DEFAULT_FILE_MAX_NUM);
             for (String file : fileList) {
                 // TODO the time is not YYYYMMDDHH
-                String dataTime = DateTransUtils.millSecConvertToTimeStr(time, 
cycleUnit);
+                String dataTime = 
DateTransUtils.millSecConvertToTimeStr(finalPatternInfo.dataTime, cycleUnit);
                 BasicFileInfo info = new BasicFileInfo(file, dataTime);
                 logger.info("scan new task fileName {} ,dataTime {}", file, 
dataTime);
                 infos.add(info);
@@ -134,34 +97,4 @@ public class FileScanner {
         }
         return ret;
     }
-
-    @SuppressWarnings("unused")
-    private static ArrayList<String> getUpdatedOrNewFiles(String logFileName,
-            int maxFileNum) {
-        ArrayList<String> ret = new ArrayList<String>();
-        ArrayList<String> directories = FilePathUtil
-                .cutDirectoryByWildcardAndDateExpression(logFileName);
-        String parentDir = directories.get(0) + File.separator
-                + directories.get(1);
-
-        Pattern pattern = Pattern.compile(directories.get(2),
-                Pattern.CASE_INSENSITIVE);
-        for (File file : new File(parentDir).listFiles()) {
-            Matcher matcher = pattern.matcher(file.getName());
-            if (matcher.matches() && ret.size() < maxFileNum) {
-                ret.add(file.getAbsolutePath());
-            }
-        }
-        return ret;
-    }
-
-    public static void main(String[] args) {
-
-        ArrayList<String> fileList = FileScanner.getUpdatedOrNewFiles(
-                "f:\\\\abc", "f:\\\\abc\\\\", "f:\\\\abc\\\\1.txt", 3, 100);
-        // fileList = FileScanner.getUpdatedOrNewFiles("F:\\abc\\1.txt", 100);
-        for (String fileName : fileList) {
-            System.out.println(fileName);
-        }
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
similarity index 95%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
index 949044d864..1fbbde3b56 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.task.file;
 
 import java.io.File;
 import java.util.Comparator;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
similarity index 97%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
index b4ddcfac53..3c3bca9d59 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.task.file;
 
 import org.apache.inlong.agent.utils.file.FileFinder;
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 4f49cfdd7d..0c104956d7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -24,9 +24,10 @@ import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.task.TaskAction;
 import org.apache.inlong.agent.plugin.task.AbstractTask;
 import org.apache.inlong.agent.plugin.task.file.FileScanner.BasicFileInfo;
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
-import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
 import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
@@ -45,8 +46,10 @@ import java.nio.file.WatchEvent;
 import java.nio.file.WatchEvent.Kind;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -62,13 +65,12 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- * Watch directory, if new valid files are created, create jobs 
correspondingly.
+ * Watch directory, if new valid files are created, create instance 
correspondingly.
  */
 public class LogFileTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileTask.class);
     public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
-    public static final String SCAN_CYCLE_RANCE = "-2";
     private static final int INSTANCE_QUEUE_CAPACITY = 10;
     private final Map<String, WatchEntity> watchers = new 
ConcurrentHashMap<>();
     private final Set<String> watchFailedDirs = new HashSet<>();
@@ -77,8 +79,8 @@ public class LogFileTask extends AbstractTask {
     public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
     public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
     private boolean retry;
-    private long startTime;
-    private long endTime;
+    private volatile long startTime;
+    private volatile long endTime;
     private boolean realTime = false;
     private Set<String> originPatterns;
     private long lastScanTime = 0;
@@ -95,19 +97,32 @@ public class LogFileTask extends AbstractTask {
     @Override
     protected void initTask() {
         instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
-        retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
+        retry = taskProfile.getBoolean(TaskConstants.FILE_TASK_RETRY, false);
         originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
                 .collect(Collectors.toSet());
         if 
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
             realTime = true;
         }
         if (retry) {
-            retryInit();
+            initRetryTask(taskProfile);
         } else {
             watchInit();
         }
     }
 
+    private boolean initRetryTask(TaskProfile profile) {
+        String dataTimeFrom = profile.get(TaskConstants.FILE_TASK_TIME_FROM, 
"");
+        String dataTimeTo = profile.get(TaskConstants.FILE_TASK_TIME_TO, "");
+        try {
+            startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom, 
profile.getCycleUnit());
+            endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo, 
profile.getCycleUnit());
+        } catch (ParseException e) {
+            LOGGER.error("retry task time error start {} end {}", 
dataTimeFrom, dataTimeTo, e);
+            return false;
+        }
+        return true;
+    }
+
     @Override
     protected List<InstanceProfile> getNewInstanceList() {
         if (retry) {
@@ -119,6 +134,7 @@ public class LogFileTask extends AbstractTask {
         while (list.size() < INSTANCE_QUEUE_CAPACITY && 
!instanceQueue.isEmpty()) {
             InstanceProfile profile = instanceQueue.poll();
             if (profile != null) {
+                LOGGER.info("test123 2 taskid {} {}", getTaskId(), 
profile.getInstanceId());
                 list.add(profile);
             }
         }
@@ -159,22 +175,14 @@ public class LogFileTask extends AbstractTask {
             LOGGER.error("task profile needs time offset");
             return false;
         }
-        if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) {
-            long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0);
-            long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0);
-            if (startTime == 0 || endTime == 0) {
-                LOGGER.error("retry task time error start {} end {}", 
startTime, endTime);
+        if (profile.getBoolean(TaskConstants.FILE_TASK_RETRY, false)) {
+            if (!initRetryTask(profile)) {
                 return false;
             }
         }
         return true;
     }
 
-    private void retryInit() {
-        startTime = taskProfile.getLong(TaskConstants.TASK_START_TIME, 0);
-        endTime = taskProfile.getLong(TaskConstants.TASK_END_TIME, 0);
-    }
-
     private void watchInit() {
         originPatterns.forEach((pathPattern) -> {
             addPathPattern(pathPattern);
@@ -182,12 +190,12 @@ public class LogFileTask extends AbstractTask {
     }
 
     public void addPathPattern(String originPattern) {
-        ArrayList<String> directories = 
FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
+        ArrayList<String> directories = 
PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
         String basicStaticPath = directories.get(0);
         LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern, 
basicStaticPath});
         /* Remember the failed watcher creations. */
         if (!new File(basicStaticPath).exists()) {
-            LOGGER.warn(AgentErrMsg.DIRECTORY_NOT_FOUND_ERROR + 
basicStaticPath);
+            LOGGER.warn("DIRECTORY_NOT_FOUND_ERROR" + basicStaticPath);
             watchFailedDirs.add(originPattern);
             return;
         }
@@ -204,9 +212,9 @@ public class LogFileTask extends AbstractTask {
             watchFailedDirs.remove(originPattern);
         } catch (IOException e) {
             if (e.toString().contains("Too many open files") || 
e.toString().contains("打开的文件过多")) {
-                LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString());
+                LOGGER.error("WATCH_DIR_ERROR", e);
             } else {
-                LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e);
+                LOGGER.error("WATCH_DIR_ERROR", e);
             }
         } catch (Exception e) {
             LOGGER.error("addPathPattern:", e);
@@ -283,21 +291,12 @@ public class LogFileTask extends AbstractTask {
     }
 
     private List<BasicFileInfo> scanExistingFileByPattern(String 
originPattern) {
-        long startScanTime = startTime;
-        long endScanTime = endTime;
-        if (!retry) {
-            long currentTime = System.currentTimeMillis();
-            // only scan two cycle, like two hours or two days
-            long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + 
taskProfile.getCycleUnit());
-            startScanTime = currentTime + offset;
-            endScanTime = currentTime;
-        }
         if (realTime) {
             return FileScanner.scanTaskBetweenTimes(originPattern, 
CycleUnitType.HOUR, taskProfile.getTimeOffset(),
-                    startScanTime, endScanTime, retry);
+                    startTime, endTime, retry);
         } else {
             return FileScanner.scanTaskBetweenTimes(originPattern, 
taskProfile.getCycleUnit(),
-                    taskProfile.getTimeOffset(), startScanTime, endScanTime, 
retry);
+                    taskProfile.getTimeOffset(), startTime, endTime, retry);
         }
     }
 
@@ -328,14 +327,7 @@ public class LogFileTask extends AbstractTask {
     private void dealWithEventMapWithCycle() {
         long startScanTime = startTime;
         long endScanTime = endTime;
-        if (!retry) {
-            long currentTime = System.currentTimeMillis();
-            // only scan two cycle, like two hours or two days
-            long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + 
taskProfile.getCycleUnit());
-            startScanTime = currentTime + offset;
-            endScanTime = currentTime;
-        }
-        List<String> dataTimeList = FileScanner.getDataTimeList(startScanTime, 
endScanTime, taskProfile.getCycleUnit(),
+        List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, 
endScanTime, taskProfile.getCycleUnit(),
                 taskProfile.getTimeOffset(), retry);
         if (dataTimeList.isEmpty()) {
             LOGGER.error("getDataTimeList get empty list");
@@ -345,12 +337,16 @@ public class LogFileTask extends AbstractTask {
         // normal task first handle current data time
         if (!retry) {
             String current = dataTimeList.remove(dataTimeList.size() - 1);
-            dealEventMapByDataTime(current, true);
             dealtDataTime.add(current);
+            if (!dealEventMapByDataTime(current, true)) {
+                return;
+            }
         }
         dataTimeList.forEach(dataTime -> {
             dealtDataTime.add(dataTime);
-            dealEventMapByDataTime(dataTime, false);
+            if (!dealEventMapByDataTime(dataTime, false)) {
+                return;
+            }
         });
         for (String dataTime : eventMap.keySet()) {
             if (!dealtDataTime.contains(dataTime)) {
@@ -365,27 +361,27 @@ public class LogFileTask extends AbstractTask {
         }
     }
 
-    private void dealEventMapByDataTime(String dataTime, boolean 
isCurrentDataTime) {
+    private boolean dealEventMapByDataTime(String dataTime, boolean 
isCurrentDataTime) {
         Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.get(dataTime);
         if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
-            return;
+            return true;
         }
         if (realTime || shouldStartNow(dataTime)) {
-            /* These codes will sort the FileCreationEvents by create time. */
-            Set<InstanceProfile> sortedEvents = new 
TreeSet<>(sameDataTimeEvents.values());
-            /* Check the file end with event creation time in asc order. */
+            Set<InstanceProfile> sortedEvents = new 
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
+            sortedEvents.addAll(sameDataTimeEvents.values());
             for (InstanceProfile sortEvent : sortedEvents) {
                 String fileName = sortEvent.getInstanceId();
                 InstanceProfile profile = sameDataTimeEvents.get(fileName);
                 if (!isCurrentDataTime && isFull()) {
-                    return;
+                    return false;
                 }
                 if (!instanceQueue.offer(profile)) {
-                    return;
+                    return false;
                 }
                 sameDataTimeEvents.remove(fileName);
             }
         }
+        return true;
     }
 
     /*
@@ -484,14 +480,14 @@ public class LogFileTask extends AbstractTask {
 
     private void handleFilePath(Path filePath, WatchEntity entity) {
         String newFileName = filePath.toFile().getAbsolutePath();
-        LOGGER.info("new file {} {}", newFileName, entity.getOriginPattern());
+        LOGGER.info("new file {} {}", newFileName, entity.getPattern());
         Matcher matcher = entity.getPattern().matcher(newFileName);
         if (matcher.matches() || matcher.lookingAt()) {
-            LOGGER.info("matched file {} {}", newFileName, 
entity.getOriginPattern());
+            LOGGER.info("matched file {} {}", newFileName, 
entity.getPattern());
             String dataTime = getDataTimeFromFileName(newFileName, 
entity.getOriginPattern(),
                     entity.getDateExpression());
             if (!checkFileNameForTime(newFileName, entity)) {
-                LOGGER.error(AgentErrMsg.FILE_ERROR + "File Timeout {} {}", 
newFileName, dataTime);
+                LOGGER.error("File Timeout {} {}", newFileName, dataTime);
                 return;
             }
             addToEvenMap(newFileName, dataTime);
@@ -566,8 +562,7 @@ public class LogFileTask extends AbstractTask {
          * Register a new watch service on the path if the old watcher is 
invalid.
          */
         if (!key.isValid()) {
-            LOGGER.warn(AgentErrMsg.WATCHER_INVALID + "Invalid Watcher {}",
-                    contextPath.getFileName());
+            LOGGER.warn("Invalid Watcher {}", contextPath.getFileName());
             try {
                 WatchService oldService = entity.getWatchService();
                 oldService.close();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
index af6d018a1d..b33eaff818 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.agent.plugin.task.file;
 
-import org.apache.inlong.agent.plugin.utils.file.DateUtils;
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
-import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
-import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NonRegexPatternPosition;
+import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
@@ -69,11 +69,11 @@ public class WatchEntity {
             String cycleUnit) {
         this.watchService = watchService;
         this.originPattern = originPattern;
-        ArrayList<String> directoryLayers = 
FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
+        ArrayList<String> directoryLayers = 
PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
         this.basicStaticPath = directoryLayers.get(0);
         this.regexPattern = 
NewDateUtils.replaceDateExpressionWithRegex(originPattern);
         pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL | Pattern.MULTILINE);
-        ArrayList<String> directories = 
FilePathUtil.cutDirectoryByWildcard(originPattern);
+        ArrayList<String> directories = 
PatternUtil.cutDirectoryByWildcard(originPattern);
         this.originPatternWithoutFileName = directories.get(0);
         this.patternWithoutFileName = Pattern
                 
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
@@ -91,6 +91,7 @@ public class WatchEntity {
     @Override
     public String toString() {
         return "WatchEntity [parentPathName=" + basicStaticPath
+                + ", pattern=" + pattern
                 + ", readFilePattern=" + regexPattern
                 + ", dateExpression=" + dateExpression + ", 
originPatternWithoutFileName="
                 + originPatternWithoutFileName + ", containRegexPattern="
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
similarity index 58%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
index ae57acbca8..2366dbae06 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
 
 import hirondelle.date4j.DateTime;
 import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.Objects;
 import java.util.TimeZone;
 import java.util.regex.Matcher;
@@ -42,16 +39,6 @@ public class DateUtils {
             Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
     private String dateFormat = "YYYYMMDDhhmmss";
 
-    public DateUtils() {
-
-    }
-
-    public DateUtils(String timeFormat) {
-        if (timeFormat != null && !timeFormat.isEmpty()) {
-            dateFormat = timeFormat;
-        }
-    }
-
     public static String getSubTimeFormat(String format, int length) {
         // format may be "YYYYMMDDhhmmss" | "YYYY_MM_DD_hh_mm_ss"
         int formatLen = format.length();
@@ -138,35 +125,6 @@ public class DateUtils {
                 : new PathDateExpression(longestPattern, position));
     }
 
-    public static String formatTime(long time) {
-        SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
-        df.setTimeZone(TimeZone.getTimeZone("GMT+8:00"));
-        return df.format(new Date(time));
-    }
-
-    public static boolean compare(String time, int offset)
-            throws ParseException {
-        long value = 1000 * 60 * 60 * 24;
-        SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
-        long to = System.currentTimeMillis();
-        long from = df.parse(time.substring(0, 8)).getTime();
-        if ((to - from) / value > offset) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    public static boolean compare(long time, int offset) {
-        long value = 1000 * 60 * 60 * 24;
-        long to = System.currentTimeMillis();
-        if ((to - time) / value > offset) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
     public void init(String timeFormat) {
         if (timeFormat != null && !timeFormat.isEmpty()) {
             dateFormat = timeFormat;
@@ -221,38 +179,6 @@ public class DateUtils {
         return sb.toString();
     }
 
-    public String getFormatSpecifiedTime(String specifiedTime) {
-        if (specifiedTime == null || specifiedTime.length() == 0) {
-            return specifiedTime;
-        }
-
-        int formatLen = dateFormat.length();
-
-        if (specifiedTime.length() == formatLen
-                && !specifiedTime.matches(DIGIT_STR)) {
-            return specifiedTime;
-        }
-
-        StringBuilder retSb = new StringBuilder();
-        int specifiedInx = 0;
-        for (int i = 0; i < formatLen; i++) {
-            char tmpChar = dateFormat.charAt(i);
-
-            if (tmpChar != 'Y' && tmpChar != 'M' && tmpChar != 'D'
-                    && tmpChar != 'h' && tmpChar != 'm') {
-                retSb.append(tmpChar);
-            } else {
-                retSb.append(specifiedTime.charAt(specifiedInx++));
-            }
-        }
-
-        logger.info(
-                "TimeRegex {} <> specifiedTime {} not match, format 
specifiedTime {}",
-                new Object[]{dateFormat, specifiedTime, retSb.toString()});
-
-        return retSb.toString();
-    }
-
     public String getDate(String src, String limit) {
         if (src == null || src.trim().isEmpty()) {
             return "";
@@ -333,123 +259,4 @@ public class DateUtils {
         }
         return dt.format(outputFormat);
     }
-
-    public String getAttrPunit(String attrs) {
-        String punit = null;
-        if (attrs != null && attrs.contains("&p=")) {
-            for (String attr : attrs.split("&")) {
-                if (attr.startsWith("p=") && attr.split("=").length == 2) {
-                    punit = attr.split("=")[1];
-                    break;
-                }
-            }
-        }
-
-        return punit;
-    }
-
-    public String getSpecifiedDate(String src, String limit, String punit) {
-        String ret = getDate(src, limit);
-        return formatCurrPeriod(ret, punit);
-    }
-
-    public String normalizeTimeRegex(String src) {
-        return getSubTimeFormat(dateFormat, src.length());
-    }
-
-    public String getCurrentDir(String src, String timeOffset) {
-        Matcher m = pattern.matcher(src);
-        StringBuffer sb = new StringBuffer();
-        while (m.find()) {
-            String oneMatch = m.group(0);
-            String currTimeStr = getDate(oneMatch, timeOffset);
-            m.appendReplacement(sb, currTimeStr);
-        }
-        m.appendTail(sb);
-        return sb.toString();
-    }
-
-    public String getCurrentDirByPunit(String src, String timeOffset,
-            String punit) {
-        Matcher m = pattern.matcher(src);
-        StringBuffer sb = new StringBuffer();
-        while (m.find()) {
-            String oneMatch = m.group(0);
-            String currTimeStr = getSpecifiedDate(oneMatch, timeOffset, punit);
-            m.appendReplacement(sb, currTimeStr);
-        }
-        m.appendTail(sb);
-
-        return sb.toString();
-    }
-
-    public String getSpecifiedDir(String src, String specifiedDate) {
-        Matcher m = pattern.matcher(src);
-        StringBuffer sb = new StringBuffer();
-
-        while (m.find()) {
-            String oneMatch = m.group(0);
-            StringBuilder tmpSb = new StringBuilder();
-            int specifiedDateIdx = 0;
-
-            for (int i = 0; i < oneMatch.length(); i++) {
-                char matchChar = oneMatch.charAt(i);
-                if (matchChar != 'Y' && matchChar != 'M' && matchChar != 'D'
-                        && matchChar != 'h' && matchChar != 'm') {
-                    tmpSb.append(matchChar);
-                } else {
-                    char dateChar = specifiedDate.charAt(specifiedDateIdx);
-                    while (String.valueOf(dateChar).matches("\\D")) {
-                        dateChar = specifiedDate.charAt(++specifiedDateIdx);
-                    }
-                    tmpSb.append(dateChar);
-                    specifiedDateIdx++;
-                }
-            }
-            m.appendReplacement(sb, tmpSb.toString());
-        }
-        m.appendTail(sb);
-        return sb.toString();
-    }
-
-    // format current period starting less-than-hour task
-    // * for example: ten-minute task:
-    // * currPeriodDataTime is 201303271905
-    // * formated value is 201303271900
-    public String formatCurrPeriod(String src, String punit) {
-        if (src == null || punit == null || src.length() != 12) {
-            return src;
-        }
-
-        String prefixMinuteStr = src.substring(0, src.length() - 2);
-        String minuteStr = src.substring(src.length() - 2, src.length());
-
-        if ("n".equals(punit)) {
-            if (minuteStr.compareTo("30") < 0) {
-                minuteStr = "00";
-            } else {
-                minuteStr = "30";
-            }
-        } else if ("q".equals(punit)) {
-            if (minuteStr.compareTo("15") < 0) {
-                minuteStr = "00";
-            } else if (minuteStr.compareTo("30") < 0) {
-                minuteStr = "15";
-            } else if (minuteStr.compareTo("45") < 0) {
-                minuteStr = "30";
-            } else {
-                minuteStr = "45";
-            }
-        } else if ("t".equals(punit)) {
-            minuteStr = minuteStr.charAt(0) + "0";
-        } else if ("f".equals(punit)) {
-            if (minuteStr.substring(1).compareTo("5") < 0) {
-                minuteStr = minuteStr.charAt(0) + "0";
-            } else {
-                minuteStr = minuteStr.charAt(0) + "5";
-            }
-        }
-
-        return prefixMinuteStr + minuteStr;
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java
similarity index 96%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java
index b7222a191a..ec1167d06e 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
 
 public class MatchPoint {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
similarity index 92%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
index 563a168885..a80f088e0d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
 
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.utils.DateTransUtils;
@@ -109,45 +109,6 @@ public class NewDateUtils {
                 .format(new Date(getDateTime(calendar, cycleUnit, 
offset).getTimeInMillis()));
     }
 
-    private static Calendar getCurDate(String cycleUnit, String offset) {
-        if (cycleUnit == null || cycleUnit.length() == 0) {
-            return null;
-        }
-
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTimeInMillis(System.currentTimeMillis());
-
-        return getDateTime(calendar, cycleUnit, offset);
-    }
-
-    public static String getDateTime(String dataTime, String cycleUnit, String 
offset) {
-        String retTime = DateTransUtils.millSecConvertToTimeStr(
-                System.currentTimeMillis(), cycleUnit);
-        try {
-            long time = DateTransUtils.timeStrConvertToMillSec(dataTime, 
cycleUnit);
-
-            Calendar calendar = Calendar.getInstance();
-            calendar.setTimeInMillis(time);
-            Calendar retCalendar = getDateTime(calendar, cycleUnit, offset);
-            if (retCalendar == null) {
-                return dataTime;
-            }
-
-            retTime = 
DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(),
-                    cycleUnit);
-        } catch (Exception e) {
-            logger.error("getDateTime error: ", e);
-        }
-        return retTime;
-    }
-
-    public static String getDateTime(long time, String cycleUnit, String 
offset) {
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTimeInMillis(time);
-        Calendar retCalendar = getDateTime(calendar, cycleUnit, offset);
-        return 
DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(), 
cycleUnit);
-    }
-
     private static Calendar getDateTime(Calendar calendar, String cycleUnit, 
String offset) {
         int cycleNumber = (cycleUnit.length() <= 1
                 ? 1
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java
similarity index 97%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java
index 0732d184e0..a12140a314 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
 
 /*
  * Describe the nearest character around the date time expression. For 
example, for date source name
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java
similarity index 96%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java
index c75e0398b8..d73329424f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
 
 /* The date expression in the file path. */
 public class PathDateExpression {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java
similarity index 91%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java
index 912b035642..dc27addb07 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.utils.file;
+package org.apache.inlong.agent.plugin.utils.regex;
 
 import org.apache.commons.lang.StringUtils;
 
 import java.io.File;
 import java.util.ArrayList;
 
-public class FilePathUtil {
+public class PatternUtil {
 
     private static final String YEAR = "YYYY";
     private static final String MONTH = "MM";
@@ -158,9 +158,18 @@ public class FilePathUtil {
     }
 
     public static boolean isSameDir(String fileName1, String fileName2) {
-        ArrayList<String> ret1 = 
FilePathUtil.cutDirectoryByWildcard(fileName1);
-        ArrayList<String> ret2 = 
FilePathUtil.cutDirectoryByWildcard(fileName2);
+        ArrayList<String> ret1 = PatternUtil.cutDirectoryByWildcard(fileName1);
+        ArrayList<String> ret2 = PatternUtil.cutDirectoryByWildcard(fileName2);
         return ret1.get(0).equals(ret2.get(0));
     }
 
+    public static String getBeforeFirstWildcard(String input) {
+        String sign = "\\^$*+?{(|[.";
+        int firstWildcardIndex = StringUtils.indexOfAny(input, sign);
+        if (firstWildcardIndex != -1) {
+            return input.substring(0, firstWildcardIndex);
+        } else {
+            return "";
+        }
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
new file mode 100644
index 0000000000..b3153e81fe
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.regex;
+
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class Scanner {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Scanner.class);
+    public static final String SCAN_CYCLE_RANCE = "-2";
+
+    public static class TimeRange {
+
+        public Long startTime;
+        public Long endTime;
+
+        public TimeRange(Long startTime, Long endTime) {
+            this.startTime = startTime;
+            this.endTime = endTime;
+        }
+    }
+
+    public static class FinalPatternInfo {
+
+        public String finalPattern;
+        public Long dataTime;
+
+        public FinalPatternInfo(String finalPattern, Long dataTime) {
+            this.finalPattern = finalPattern;
+            this.dataTime = dataTime;
+        }
+    }
+
+    public static List<FinalPatternInfo> getFinalPatternInfos(String 
originPattern, String cycleUnit, String timeOffset,
+            long startTime, long endTime, boolean isRetry) {
+        TimeRange range = Scanner.getTimeRange(startTime, endTime, cycleUnit, 
timeOffset, isRetry);
+        String strStartTime = 
DateTransUtils.millSecConvertToTimeStr(range.startTime, cycleUnit);
+        String strEndTime = 
DateTransUtils.millSecConvertToTimeStr(range.endTime, cycleUnit);
+        LOGGER.info("{} scan time is between {} and {}", originPattern, 
strStartTime, strEndTime);
+        List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime, 
range.endTime, cycleUnit);
+        List<FinalPatternInfo> finalPatternList = new ArrayList<>();
+        for (Long time : dateRegion) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTimeInMillis(time);
+            FinalPatternInfo finalPatternInfo = new FinalPatternInfo(
+                    NewDateUtils.replaceDateExpression(calendar, 
originPattern), time);
+            finalPatternList.add(finalPatternInfo);
+        }
+        return finalPatternList;
+    }
+
+    public static List<String> getDataTimeList(long startTime, long endTime, 
String cycleUnit, String timeOffset,
+            boolean isRetry) {
+        TimeRange range = getTimeRange(startTime, endTime, cycleUnit, 
timeOffset, isRetry);
+        List<String> dataTimeList = new ArrayList<>();
+        List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime, 
range.endTime, cycleUnit);
+        for (Long time : dateRegion) {
+            String dataTime = DateTransUtils.millSecConvertToTimeStr(time, 
cycleUnit);
+            dataTimeList.add(dataTime);
+        }
+        return dataTimeList;
+    }
+
+    public static TimeRange getTimeRange(long startTime, long endTime, String 
cycleUnit, String timeOffset,
+            boolean isRetry) {
+        if (!isRetry) {
+            long currentTime = System.currentTimeMillis();
+            // only scan two cycle, like two hours or two days
+            long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + 
cycleUnit);
+            startTime = currentTime + offset + 
DateTransUtils.calcOffset(timeOffset);
+            endTime = currentTime + DateTransUtils.calcOffset(timeOffset);
+        }
+        return new TimeRange(startTime, endTime);
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 7730bb29c8..3dc4f8ab15 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -83,7 +83,7 @@ public class AgentBaseTestsHelper {
     }
 
     public TaskProfile getTaskProfile(int taskId, String pattern, String 
dataContentStyle, boolean retry,
-            Long startTime, Long endTime,
+            String startTime, String endTime,
             TaskStateEnum state, String cycleUnit, String timeZone, 
List<String> filterStreams) {
         DataConfig dataConfig = getDataConfig(taskId, pattern, 
dataContentStyle, retry, startTime, endTime,
                 state, cycleUnit, timeZone,
@@ -92,9 +92,9 @@ public class AgentBaseTestsHelper {
         return profile;
     }
 
-    private DataConfig getDataConfig(int taskId, String pattern, String 
dataContentStyle, boolean retry, Long startTime,
-            Long endTime,
-            TaskStateEnum state, String cycleUnit, String timeZone, 
List<String> filterStreams) {
+    private DataConfig getDataConfig(int taskId, String pattern, String 
dataContentStyle, boolean retry,
+            String startTime, String endTime, TaskStateEnum state, String 
cycleUnit, String timeZone,
+            List<String> filterStreams) {
         DataConfig dataConfig = new DataConfig();
         dataConfig.setInlongGroupId("testGroupId");
         dataConfig.setInlongStreamId("testStreamId");
@@ -110,8 +110,8 @@ public class AgentBaseTestsHelper {
         fileTaskConfig.setMaxFileCount(100);
         fileTaskConfig.setCycleUnit(cycleUnit);
         fileTaskConfig.setRetry(retry);
-        fileTaskConfig.setStartTime(startTime);
-        fileTaskConfig.setEndTime(endTime);
+        fileTaskConfig.setDataTimeFrom(startTime);
+        fileTaskConfig.setDataTimeTo(endTime);
         // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
         fileTaskConfig.setDataContentStyle(dataContentStyle);
         fileTaskConfig.setDataSeparator("|");
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
index f2c5f25ee3..61c94c4dd1 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java
@@ -59,7 +59,7 @@ public class TestInstanceManager {
         helper = new 
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
         Store basicInstanceStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
-        taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, 
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
+        taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, CycleUnitType.HOUR,
                 "GMT+6:00", null);
         Store taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
         TaskStore taskStore = new TaskStore(taskBasicStore);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 5524c5e96e..066776d32f 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -47,7 +47,7 @@ public class KafkaSinkTest {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index 93702fad16..d7733259ab 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -47,7 +47,7 @@ public class PulsarSinkTest {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index afeb3565e2..5a1168edef 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -24,7 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.message.file.OffsetAckInfo;
 import org.apache.inlong.agent.message.file.SenderMessage;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
@@ -70,7 +70,7 @@ public class TestSenderManager {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
         helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING, "D",
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("", fileName,
                 taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index df70039459..6ee892c914 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -27,7 +27,7 @@ import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
 import org.apache.inlong.agent.store.Store;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
@@ -77,7 +77,7 @@ public class TestLogFileSource {
     private LogFileSource getSource(int taskId, long offset) {
         try {
             String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", false, 0L, 0L,
+            TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", false, "", "",
                     TaskStateEnum.RUNNING, "D",
                     "GMT+8:00", null);
             String fileName = 
LOADER.getResource("test/20230928_1.txt").getPath();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
index 2680c01e06..4f2e90870b 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -121,7 +121,7 @@ public class TestRedisSource {
         final String command = "zscore";
         final String subOperation = "set,del";
 
-        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
0L, 0L, TaskStateEnum.RUNNING, "D",
+        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
"", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
         profile = taskProfile.createInstanceProfile("",
                 "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 2a90bdc37a..377e5ae913 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -136,7 +136,7 @@ public class TestSQLServerSource {
         final String tableName = "test_source";
         final String serverName = "server-01";
 
-        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
0L, 0L, TaskStateEnum.RUNNING, "D",
+        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
"", "", TaskStateEnum.RUNNING, "D",
                 "GMT+8:00", null);
         instanceProfile = taskProfile.createInstanceProfile("",
                 "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 1ef3b5db1e..440c4a5208 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -40,10 +40,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -83,17 +81,17 @@ public class TestLogFileTask {
     public void testScan() throws Exception {
         doTest(1, Arrays.asList("testScan/20230928_1/test_1.txt"),
                 resourceParentPath + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt", 
CycleUnitType.DAY, Arrays.asList("20230928"),
-                "2023-09-28 00:00:00", "2023-09-30 23:00:00");
+                "20230928", "20230930");
         doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"),
                 resourceParentPath + "/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt",
-                CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023-09-28 
00:00:00", "2023-09-30 23:00:00");
+                CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023092800", 
"2023093023");
         doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt", 
"testScan/202309301059_1/test_1.txt"),
                 resourceParentPath + "/YYYYMMDDhhmm_[0-9]+/test_[0-9]+.txt",
-                CycleUnitType.MINUTE, Arrays.asList("202309281030", 
"202309301059"), "2023-09-28 00:00:00",
-                "2023-09-30 23:00:00");
+                CycleUnitType.MINUTE, Arrays.asList("202309281030", 
"202309301059"), "202309280000",
+                "202309302300");
         doTest(4, Arrays.asList("testScan/20241030/23/59.txt"),
                 resourceParentPath + "/YYYYMMDD/hh/mm.txt",
-                CycleUnitType.MINUTE, Arrays.asList("202410302359"), 
"2024-10-30 00:00:00", "2024-10-31 00:00:00");
+                CycleUnitType.MINUTE, Arrays.asList("202410302359"), 
"202410300000", "202410310000");
     }
 
     private void doTest(int taskId, List<String> resources, String pattern, 
String cycle, List<String> srcDataTimes,
@@ -103,20 +101,14 @@ public class TestLogFileTask {
         for (int i = 0; i < resources.size(); i++) {
             resourceName.add(LOADER.getResource(resources.get(i)).getPath());
         }
-        TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", true, 0L, 0L, TaskStateEnum.RUNNING,
-                cycle,
-                "GMT+8:00", null);
+        TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, 
"csv", true, "", "", TaskStateEnum.RUNNING,
+                cycle, "GMT+8:00", null);
         LogFileTask dayTask = null;
         final List<String> fileName = new ArrayList();
         final List<String> dataTime = new ArrayList();
         try {
-
-            Date parse = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").parse(startTime);
-            long start = parse.getTime();
-            parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endTime);
-            long end = parse.getTime();
-            taskProfile.setLong(TaskConstants.TASK_START_TIME, start);
-            taskProfile.setLong(TaskConstants.TASK_END_TIME, end);
+            taskProfile.set(TaskConstants.FILE_TASK_TIME_FROM, startTime);
+            taskProfile.set(TaskConstants.FILE_TASK_TIME_TO, endTime);
             dayTask = PowerMockito.spy(new LogFileTask());
             PowerMockito.doAnswer(invocation -> {
                 fileName.add(invocation.getArgument(0));
@@ -128,7 +120,7 @@ public class TestLogFileTask {
             dayTask.init(manager, taskProfile, 
manager.getInstanceBasicStore());
             EXECUTOR_SERVICE.submit(dayTask);
         } catch (Exception e) {
-            LOGGER.error("source init error {}", e);
+            LOGGER.error("source init error", e);
             Assert.assertTrue("source init error", false);
         }
         await().atMost(10, TimeUnit.SECONDS)
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
index 608d3adec6..014c5ce4e2 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java
@@ -58,7 +58,7 @@ public class TestTaskManager {
             manager = new TaskManager();
             TaskStore taskStore = manager.getTaskStore();
             for (int i = 1; i <= 10; i++) {
-                TaskProfile taskProfile = helper.getTaskProfile(i, pattern, 
"csv", false, 0L, 0L, TaskStateEnum.RUNNING,
+                TaskProfile taskProfile = helper.getTaskProfile(i, pattern, 
"csv", false, "", "", TaskStateEnum.RUNNING,
                         "D", "GMT+8:00", null);
                 taskProfile.setTaskClass(MockTask.class.getCanonicalName());
                 taskStore.storeTask(taskProfile);
@@ -74,7 +74,7 @@ public class TestTaskManager {
             Assert.assertTrue("manager start error", false);
         }
 
-        TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING,
+        TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING,
                 "D", "GMT+8:00", null);
         String taskId1 = taskProfile1.getTaskId();
         taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
@@ -99,7 +99,7 @@ public class TestTaskManager {
         Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == 
TaskStateEnum.RUNNING);
 
         // test delete
-        TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", 
false, 0L, 0L, TaskStateEnum.RUNNING,
+        TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", 
false, "", "", TaskStateEnum.RUNNING,
                 "D", "GMT+8:00", null);
         taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
         List<TaskProfile> taskProfiles2 = new ArrayList<>();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index cf1b7c8f95..a1f13122e3 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.agent.plugin.utils;
 
-import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.metric.MetricRegister;
 
@@ -91,6 +91,8 @@ public class TestUtils {
                 Arrays.asList("/data/log_minute", "minute_YYYYMMDDhh*", 
"mm.log_[0-9]+"));
         testCutDirectoryByWildcard("/data/123+/YYYYMMDDhhmm.log",
                 Arrays.asList("/data", "123+", "YYYYMMDDhhmm.log"));
+        testCutDirectoryByWildcard("/data/2024112610*/test.log",
+                Arrays.asList("/data", "2024112610*", "test.log"));
 
         /*
          * 1 cut the file name 2 cut the path contains wildcard or date 
expression
@@ -103,6 +105,20 @@ public class TestUtils {
                 Arrays.asList("/data", "123*/MMDD", "test.log"));
         
testCutDirectoryByWildcardAndDateExpression("/data/YYYYMMDD/123*/test.log",
                 Arrays.asList("/data", "YYYYMMDD/123*", "test.log"));
+
+        /*
+         * get the string before the first wildcard
+         */
+        testGetBeforeFirstWildcard("/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm.log",
+                "/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm");
+        testGetBeforeFirstWildcard("/data/123*/MMDD/test.log",
+                "/data/123");
+        testGetBeforeFirstWildcard("/data/YYYYMMDD/123*/test.log",
+                "/data/YYYYMMDD/123");
+        testGetBeforeFirstWildcard("test/65535_YYYYMMDD_hh00.log",
+                "test/65535_YYYYMMDD_hh00");
+        testGetBeforeFirstWildcard("/data/YYYYMMDD/*123/test.log",
+                "/data/YYYYMMDD/");
     }
 
     private void testReplaceDateExpression(String src, String dst) throws 
ParseException {
@@ -113,12 +129,17 @@ public class TestUtils {
     }
 
     private void testCutDirectoryByWildcard(String src, List<String> dst) {
-        ArrayList<String> directories = 
FilePathUtil.cutDirectoryByWildcard(src);
+        ArrayList<String> directories = 
PatternUtil.cutDirectoryByWildcard(src);
         Assert.assertEquals(directories, dst);
     }
 
+    private void testGetBeforeFirstWildcard(String src, String dst) {
+        String temp = PatternUtil.getBeforeFirstWildcard(src);
+        Assert.assertEquals(dst, temp);
+    }
+
     private void testCutDirectoryByWildcardAndDateExpression(String src, 
List<String> dst) {
-        ArrayList<String> directoryLayers = 
FilePathUtil.cutDirectoryByWildcardAndDateExpression(src);
+        ArrayList<String> directoryLayers = 
PatternUtil.cutDirectoryByWildcardAndDateExpression(src);
         Assert.assertEquals(directoryLayers, dst);
     }
 

Reply via email to