This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a97c353dd [INLONG-9335][Agent] Bring cycle parameters when creating 
an instance (#9336)
8a97c353dd is described below

commit 8a97c353ddcabf70e319d9e006293dda383d15ae
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Nov 27 14:43:23 2023 +0800

    [INLONG-9335][Agent] Bring cycle parameters when creating an instance 
(#9336)
    
    * [INLONG-9335][Agent] Bring cycle parameters when creating an instance
    
    * [INLONG-9335][Agent] Bring cycle parameters when creating an instance
---
 .../org/apache/inlong/agent/conf/TaskProfile.java  |  7 ++-
 .../inlong/agent/constant/CycleUnitType.java       | 25 ++++++++
 .../inlong/agent/constant/FileTriggerType.java     | 66 ---------------------
 .../apache/inlong/agent/utils/DateTransUtils.java  |  2 +-
 .../agent/core/instance/TestInstanceManager.java   |  6 +-
 .../agent/plugin/task/filecollect/FileScanner.java | 20 +++----
 .../task/filecollect/LogFileCollectTask.java       | 68 +++++++++++++++-------
 .../inlong/agent/plugin/utils/file/DateUtils.java  |  6 +-
 .../agent/plugin/utils/file/NewDateUtils.java      |  3 +
 .../sinks/filecollect/TestSenderManager.java       |  2 +-
 .../agent/plugin/sources/TestLogFileSource.java    |  4 +-
 .../agent/plugin/sources/TestMqttConnect.java      |  3 +-
 .../inlong/agent/plugin/utils/TestUtils.java       |  1 +
 13 files changed, 100 insertions(+), 113 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index cbad21e499..de863a7aa0 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -62,7 +62,7 @@ public class TaskProfile extends AbstractConfiguration {
     }
 
     public String getTimeOffset() {
-        return get(TaskConstants.TASK_FILE_TIME_OFFSET);
+        return get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
     }
 
     public String getTimeZone() {
@@ -118,7 +118,8 @@ public class TaskProfile extends AbstractConfiguration {
         return GSON.toJson(getConfigStorage());
     }
 
-    public InstanceProfile createInstanceProfile(String instanceClass, String 
fileName, String dataTime,
+    public InstanceProfile createInstanceProfile(String instanceClass, String 
fileName, String cycleUnit,
+            String dataTime,
             long fileUpdateTime) {
         InstanceProfile instanceProfile = 
InstanceProfile.parseJsonStr(toJsonStr());
         instanceProfile.setInstanceClass(instanceClass);
@@ -126,7 +127,7 @@ public class TaskProfile extends AbstractConfiguration {
         instanceProfile.setSourceDataTime(dataTime);
         Long sinkDataTime = 0L;
         try {
-            sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime, 
getCycleUnit(),
+            sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime, 
cycleUnit,
                     TimeZone.getTimeZone(getTimeZone()));
         } catch (ParseException e) {
             logger.error("createInstanceProfile ParseException error: ", e);
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
new file mode 100644
index 0000000000..d12e825b85
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class CycleUnitType {
+
+    public static final String DAY = "D";
+    public static final String HOUR = "h";
+    public static final String REAL_TIME = "R";
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java
deleted file mode 100644
index adc7109fa5..0000000000
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.constant;
-
-/**
- * Collection type of file data
- */
-public class FileTriggerType {
-
-    /**
-     * Increment only collect newly created files content.
-     *
-     * <p>Here is an example. Collect task submit at '2022-01-01 23:00:00' 
with pattern '/bin/*.sh'.
-     * <blockquote><pre>
-     * .
-     * └── [2022-01-01 20:49:42]  bin
-     *     ├── [2022-01-01 20:10:00]  managerctl
-     *     ├── [2022-01-01 21:10:00]  restart.sh
-     *     ├── [2022-01-01 22:10:00]  shutdown.sh
-     *     └── [2022-01-01 23:49:00]  startup.sh
-     * </pre></blockquote>
-     *
-     * <p>It Finally collect file is:
-     * <blockquote><pre>
-     * ./bin/startup.sh
-     * </pre></blockquote>
-     */
-    public static final String INCREMENT = "INCREMENT";
-
-    /**
-     * FULL collect existing files, as well as newly created files.
-     *
-     * <p>Here is an example. Collect task submit at '2022-01-01 23:00:00' 
with pattern '/bin/*.sh'.
-     * <blockquote><pre>
-     * .
-     * └── [2022-01-01 20:49:42]  bin
-     *     ├── [2022-01-01 20:10:00]  managerctl
-     *     ├── [2022-01-01 21:10:00]  restart.sh
-     *     ├── [2022-01-01 22:10:00]  shutdown.sh
-     *     └── [2022-01-01 23:49:00]  startup.sh
-     * </pre></blockquote>
-     *
-     * <p>It Finally collect file is:
-     * <blockquote><pre>
-     * ./bin/startup.sh
-     * ./bin/shutdown.sh
-     * ./bin/startup.sh
-     * </pre></blockquote>
-     */
-    public static final String FULL = "FULL";
-}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index 2aa08742e8..55182c7dd8 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -56,7 +56,7 @@ public class DateTransUtils {
         } else if (cycleUnit.contains("m") && time.length() == 12) {
             df = new SimpleDateFormat("yyyyMMddHHmm");
         } else {
-            logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit);
+            logger.error("time {}, cycleUnit {} can't parse!", time, 
cycleUnit);
             throw new ParseException(time, 0);
         }
         try {
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 910a32a46a..558bed0204 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -68,7 +68,8 @@ public class TestInstanceManager {
     public void testInstanceManager() {
         long timeBefore = AgentUtils.getCurrentTime();
         InstanceProfile profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", 
AgentUtils.getCurrentTime());
+                helper.getTestRootDir() + "/2023092710_1.txt", 
taskProfile.getCycleUnit(), "2023092710",
+                AgentUtils.getCurrentTime());
         String sinkDataTime = String.valueOf(profile.getSinkDataTime());
         try {
             String add2TimeZone = String.valueOf(
@@ -98,7 +99,8 @@ public class TestInstanceManager {
 
         // test continue
         profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", 
AgentUtils.getCurrentTime());
+                helper.getTestRootDir() + "/2023092710_1.txt", 
taskProfile.getCycleUnit(), "2023092710",
+                AgentUtils.getCurrentTime());
         action = new InstanceAction();
         action.setActionType(ActionType.ADD);
         action.setProfile(profile);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index 8f7d2d9d80..fc989b3bcf 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.agent.plugin.task.filecollect;
 
-import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
 import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
 import org.apache.inlong.agent.plugin.utils.file.Files;
@@ -58,19 +57,19 @@ public class FileScanner {
 
     private static final Logger logger = 
LoggerFactory.getLogger(FileScanner.class);
 
-    public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, 
String originPattern, long startTime,
+    public static List<BasicFileInfo> scanTaskBetweenTimes(String 
originPattern, String cycleUnit, String timeOffset,
+            long startTime,
             long endTime, boolean isRetry) {
-        String cycleUnit = conf.getCycleUnit();
         if (!isRetry) {
-            startTime += NewDateUtils.calcOffset(conf.getTimeOffset());
-            endTime += NewDateUtils.calcOffset(conf.getTimeOffset());
+            startTime += NewDateUtils.calcOffset(timeOffset);
+            endTime += NewDateUtils.calcOffset(timeOffset);
         }
         String strStartTime = 
DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
         String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, 
cycleUnit);
-        logger.info("task {} this scan time is between {} and {}.",
-                new Object[]{conf.getTaskId(), strStartTime, strEndTime});
+        logger.info("{} scan time is between {} and {}",
+                new Object[]{originPattern, strStartTime, strEndTime});
 
-        return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, 
strStartTime, strEndTime);
+        return scanTaskBetweenTimes(cycleUnit, originPattern, strStartTime, 
strEndTime);
     }
 
     /* Scan log files and create tasks between two times. */
@@ -91,8 +90,7 @@ public class FileScanner {
                 // TODO the time is not YYYYMMDDHH
                 String dataTime = DateTransUtils.millSecConvertToTimeStr(time, 
cycleUnit);
                 BasicFileInfo info = new BasicFileInfo(file, dataTime);
-                logger.info("scan new task fileName {} ,dataTime {}", file,
-                        DateTransUtils.millSecConvertToTimeStr(time, 
cycleUnit));
+                logger.info("scan new task fileName {} ,dataTime {}", file, 
dataTime);
                 infos.add(info);
             }
         }
@@ -114,11 +112,9 @@ public class FileScanner {
             String fileName, long depth, int maxFileNum) {
         ArrayList<String> ret = new ArrayList<String>();
         ArrayList<File> readyFiles = new ArrayList<File>();
-
         if (!new File(firstDir).isDirectory()) {
             return ret;
         }
-
         for (File pathname : Files.find(firstDir).yieldFilesAndDirectories()
                 .recursive().withDepth((int) depth).withDirNameRegex(secondDir)
                 .withFileNameRegex(fileName)) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index dc183881c1..9dc7d26c11 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.task.filecollect;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.instance.ActionType;
 import org.apache.inlong.agent.core.instance.InstanceAction;
@@ -85,6 +86,7 @@ public class LogFileCollectTask extends Task {
     private boolean retry;
     private long startTime;
     private long endTime;
+    private boolean isRealTime = false;
     private boolean initOK = false;
     private Set<String> originPatterns;
     private long lastScanTime = 0;
@@ -96,7 +98,7 @@ public class LogFileCollectTask extends Task {
     @Override
     public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
         if (!isProfileValid(taskProfile)) {
-            LOGGER.error("task profile invalid {}", taskProfile);
+            LOGGER.error("task profile invalid {}", taskProfile.toJsonStr());
             return;
         }
         taskManager = (TaskManager) srcManager;
@@ -115,12 +117,15 @@ public class LogFileCollectTask extends Task {
         retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
         originPatterns = 
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
                 .collect(Collectors.toSet());
+        if 
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+            isRealTime = true;
+        }
         instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
                 basicDb);
         try {
             instanceManager.start();
         } catch (Exception e) {
-            LOGGER.error("start instance manager error {}", e.getMessage());
+            LOGGER.error("start instance manager error: ", e);
         }
     }
 
@@ -131,12 +136,16 @@ public class LogFileCollectTask extends Task {
         }
         boolean ret =
                 profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
-                        && profile.hasKey(TaskConstants.TASK_FILE_TIME_OFFSET)
                         && profile.hasKey(TaskConstants.FILE_MAX_NUM);
         if (!ret) {
             LOGGER.error("task profile needs file keys");
             return false;
         }
+        if 
(profile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) != 0 && 
!profile.hasKey(
+                TaskConstants.TASK_FILE_TIME_OFFSET)) {
+            LOGGER.error("task profile needs time offset");
+            return false;
+        }
         if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) {
             long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0);
             long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0);
@@ -176,8 +185,11 @@ public class LogFileCollectTask extends Task {
              * linux regular expression, we have to replace * to ., and 
replace . with \\. .
              */
             WatchService watchService = 
FileSystems.getDefault().newWatchService();
-            WatchEntity entity = new WatchEntity(watchService, originPattern, 
taskProfile.getCycleUnit(),
-                    taskProfile.getTimeOffset());
+            String timeOffset = "";
+            if (!isRealTime) {
+                timeOffset = taskProfile.getTimeOffset();
+            }
+            WatchEntity entity = new WatchEntity(watchService, originPattern, 
taskProfile.getCycleUnit(), timeOffset);
             entity.registerRecursively();
             watchers.put(originPattern, entity);
             watchFailedDirs.remove(originPattern);
@@ -187,6 +199,8 @@ public class LogFileCollectTask extends Task {
             } else {
                 LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e);
             }
+        } catch (Exception e) {
+            LOGGER.error("addPathPattern:", e);
         }
     }
 
@@ -311,7 +325,13 @@ public class LogFileCollectTask extends Task {
             startScanTime = currentTime + offset;
             endScanTime = currentTime;
         }
-        return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern, 
startScanTime, endScanTime, retry);
+        if (isRealTime) {
+            return FileScanner.scanTaskBetweenTimes(originPattern, 
CycleUnitType.HOUR, taskProfile.getTimeOffset(),
+                    startScanTime, endScanTime, retry);
+        } else {
+            return FileScanner.scanTaskBetweenTimes(originPattern, 
taskProfile.getCycleUnit(),
+                    taskProfile.getTimeOffset(), startScanTime, endScanTime, 
retry);
+        }
     }
 
     private void runForWatching() {
@@ -336,17 +356,7 @@ public class LogFileCollectTask extends Task {
             if (sameDataTimeEvents.isEmpty()) {
                 continue;
             }
-            /*
-             * Calculate whether the event needs to be processed at the 
current time based on its data time, business
-             * cycle, and offset
-             */
-            String dataTime = entry.getKey();
-            String shouldStartTime =
-                    NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
-            String currentTime = getCurrentTime();
-            if (currentTime.compareTo(shouldStartTime) >= 0) {
-                LOGGER.info("submit now taskId {}, dataTime {}, currentTime 
{}, shouldStartTime {}",
-                        new Object[]{getTaskId(), dataTime, currentTime, 
shouldStartTime});
+            if (isRealTime || shouldStartNow(entry.getKey())) {
                 /* These codes will sort the FileCreationEvents by create 
time. */
                 Set<InstanceProfile> sortedEvents = new 
TreeSet<>(sameDataTimeEvents.values());
                 /* Check the file end with event creation time in asc order. */
@@ -360,15 +370,23 @@ public class LogFileCollectTask extends Task {
                     }
                     sameDataTimeEvents.remove(fileName);
                 }
-            } else {
-                LOGGER.info("submit later taskId {}, dataTime {}, currentTime 
{}, shouldStartTime {}",
-                        new Object[]{getTaskId(), dataTime, currentTime, 
shouldStartTime});
             }
         }
     }
 
+    /*
+     * Calculate whether the event needs to be processed at the current time 
based on its data time, business cycle, and
+     * offset
+     */
+    private boolean shouldStartNow(String dataTime) {
+        String shouldStartTime =
+                NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
+        String currentTime = getCurrentTime();
+        return currentTime.compareTo(shouldStartTime) >= 0;
+    }
+
     private void removeTimeoutEven(Map<String, Map<String, InstanceProfile>> 
eventMap, boolean isRetry) {
-        if (isRetry) {
+        if (isRetry || isRealTime) {
             return;
         }
         for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
@@ -484,8 +502,14 @@ public class LogFileCollectTask extends Task {
             LOGGER.error("should not happen! may be {} has been deleted and 
add again", fileName);
             return;
         }
+        String cycleUnit = "";
+        if (isRealTime) {
+            cycleUnit = CycleUnitType.HOUR;
+        } else {
+            cycleUnit = taskProfile.getCycleUnit();
+        }
         InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
-                fileName, dataTime, fileUpdateTime);
+                fileName, cycleUnit, dataTime, fileUpdateTime);
         sameDataTimeEvents.put(fileName, instanceProfile);
         LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", 
taskProfile.getTaskId(), dataTime, fileName);
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
index 2280b2db5a..ae57acbca8 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
@@ -78,9 +78,6 @@ public class DateUtils {
                 ret = oneMatch;
             }
         }
-        if (ret.isEmpty()) {
-            throw new IllegalArgumentException("time pattern " + " not find in 
" + src);
-        }
         return ret;
     }
 
@@ -91,6 +88,9 @@ public class DateUtils {
         }
 
         String longestPattern = extractLongestTimeRegex(src);
+        if (longestPattern.isEmpty()) {
+            return new PathDateExpression(longestPattern, 
NonRegexPatternPosition.NONE);
+        }
         String regexSign = "\\^$*+?{(|[)]";
 
         String range = "+?*{";
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 62207acc81..706167788f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -242,6 +242,9 @@ public class NewDateUtils {
      * @return
      */
     public static long calcOffset(String timeOffset) {
+        if (timeOffset.length() == 0) {
+            return 0;
+        }
         String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
         int startIndex;
         int symbol;
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 0fae339400..4e8ce6b413 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -72,7 +72,7 @@ public class TestSenderManager {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
         profile = taskProfile.createInstanceProfile("", fileName,
-                "20230927", AgentUtils.getCurrentTime());
+                taskProfile.getCycleUnit(), "20230927", 
AgentUtils.getCurrentTime());
     }
 
     @AfterClass
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index b8e6d60fef..3397f9f58a 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -61,7 +61,7 @@ public class TestLogFileSource {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
         instanceProfile = taskProfile.createInstanceProfile("",
-                fileName, "20230928", AgentUtils.getCurrentTime());
+                fileName, taskProfile.getCycleUnit(), "20230928", 
AgentUtils.getCurrentTime());
     }
 
     private LogFileSource getSource() {
@@ -110,7 +110,7 @@ public class TestLogFileSource {
             msg = source.read();
             cnt++;
         }
-        await().atMost(6, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+        await().atMost(30, TimeUnit.SECONDS).until(() -> 
source.sourceFinish());
         source.destroy();
         Assert.assertTrue(cnt == 3);
         Assert.assertTrue(srcLen == readLen);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
index 877f360ca8..89aa196ac2 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
@@ -61,7 +61,8 @@ public class TestMqttConnect {
 
                 @Override
                 public void run() {
-                    reader.init(jobProfile.createInstanceProfile("", "", "", 
AgentUtils.getCurrentTime()));
+                    reader.init(jobProfile.createInstanceProfile("", "", 
jobProfile.getCycleUnit(), "",
+                            AgentUtils.getCurrentTime()));
                     while (!reader.isFinished()) {
                         Message message = reader.read();
                         if (Objects.nonNull(message)) {
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index 81d2433349..46860ae20a 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -51,6 +51,7 @@ public class TestUtils {
         Assert.assertTrue(NewDateUtils.calcOffset("0") == 0);
         Assert.assertTrue(NewDateUtils.calcOffset("1") == 0);
         Assert.assertTrue(NewDateUtils.calcOffset("10") == 0);
+        Assert.assertTrue(NewDateUtils.calcOffset("") == 0);
     }
 
     public static String getTestTriggerProfile() {

Reply via email to