This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8a97c353dd [INLONG-9335][Agent] Bring cycle parameters when creating an instance (#9336) 8a97c353dd is described below commit 8a97c353ddcabf70e319d9e006293dda383d15ae Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Nov 27 14:43:23 2023 +0800 [INLONG-9335][Agent] Bring cycle parameters when creating an instance (#9336) * [INLONG-9335][Agent] Bring cycle parameters when creating an instance * [INLONG-9335][Agent] Bring cycle parameters when creating an instance --- .../org/apache/inlong/agent/conf/TaskProfile.java | 7 ++- .../inlong/agent/constant/CycleUnitType.java | 25 ++++++++ .../inlong/agent/constant/FileTriggerType.java | 66 --------------------- .../apache/inlong/agent/utils/DateTransUtils.java | 2 +- .../agent/core/instance/TestInstanceManager.java | 6 +- .../agent/plugin/task/filecollect/FileScanner.java | 20 +++---- .../task/filecollect/LogFileCollectTask.java | 68 +++++++++++++++------- .../inlong/agent/plugin/utils/file/DateUtils.java | 6 +- .../agent/plugin/utils/file/NewDateUtils.java | 3 + .../sinks/filecollect/TestSenderManager.java | 2 +- .../agent/plugin/sources/TestLogFileSource.java | 4 +- .../agent/plugin/sources/TestMqttConnect.java | 3 +- .../inlong/agent/plugin/utils/TestUtils.java | 1 + 13 files changed, 100 insertions(+), 113 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java index cbad21e499..de863a7aa0 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -62,7 +62,7 @@ public class TaskProfile extends AbstractConfiguration { } public String getTimeOffset() { - return get(TaskConstants.TASK_FILE_TIME_OFFSET); + return get(TaskConstants.TASK_FILE_TIME_OFFSET, ""); } public String getTimeZone() { @@ -118,7 +118,8 @@ public class TaskProfile extends AbstractConfiguration { return GSON.toJson(getConfigStorage()); } - public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String dataTime, + public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String cycleUnit, + String dataTime, long fileUpdateTime) { InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr()); instanceProfile.setInstanceClass(instanceClass); @@ -126,7 +127,7 @@ public class TaskProfile extends AbstractConfiguration { instanceProfile.setSourceDataTime(dataTime); Long sinkDataTime = 0L; try { - sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime, getCycleUnit(), + sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime, cycleUnit, TimeZone.getTimeZone(getTimeZone())); } catch (ParseException e) { logger.error("createInstanceProfile ParseException error: ", e); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java new file mode 100644 index 0000000000..d12e825b85 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.constant; + +public class CycleUnitType { + + public static final String DAY = "D"; + public static final String HOUR = "h"; + public static final String REAL_TIME = "R"; +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java deleted file mode 100644 index adc7109fa5..0000000000 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.constant; - -/** - * Collection type of file data - */ -public class FileTriggerType { - - /** - * Increment only collect newly created files content. - * - * <p>Here is an example. Collect task submit at '2022-01-01 23:00:00' with pattern '/bin/*.sh'. - * <blockquote><pre> - * . - * └── [2022-01-01 20:49:42] bin - * ├── [2022-01-01 20:10:00] managerctl - * ├── [2022-01-01 21:10:00] restart.sh - * ├── [2022-01-01 22:10:00] shutdown.sh - * └── [2022-01-01 23:49:00] startup.sh - * </pre></blockquote> - * - * <p>It Finally collect file is: - * <blockquote><pre> - * ./bin/startup.sh - * </pre></blockquote> - */ - public static final String INCREMENT = "INCREMENT"; - - /** - * FULL collect existing files, as well as newly created files. - * - * <p>Here is an example. Collect task submit at '2022-01-01 23:00:00' with pattern '/bin/*.sh'. - * <blockquote><pre> - * . - * └── [2022-01-01 20:49:42] bin - * ├── [2022-01-01 20:10:00] managerctl - * ├── [2022-01-01 21:10:00] restart.sh - * ├── [2022-01-01 22:10:00] shutdown.sh - * └── [2022-01-01 23:49:00] startup.sh - * </pre></blockquote> - * - * <p>It Finally collect file is: - * <blockquote><pre> - * ./bin/startup.sh - * ./bin/shutdown.sh - * ./bin/startup.sh - * </pre></blockquote> - */ - public static final String FULL = "FULL"; -} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java index 2aa08742e8..55182c7dd8 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java @@ -56,7 +56,7 @@ public class DateTransUtils { } else if (cycleUnit.contains("m") && time.length() == 12) { df = new SimpleDateFormat("yyyyMMddHHmm"); } else { - logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit); + logger.error("time {}, cycleUnit {} can't parse!", time, cycleUnit); throw new ParseException(time, 0); } try { diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index 910a32a46a..558bed0204 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -68,7 +68,8 @@ public class TestInstanceManager { public void testInstanceManager() { long timeBefore = AgentUtils.getCurrentTime(); InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), - helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime()); + helper.getTestRootDir() + "/2023092710_1.txt", taskProfile.getCycleUnit(), "2023092710", + AgentUtils.getCurrentTime()); String sinkDataTime = String.valueOf(profile.getSinkDataTime()); try { String add2TimeZone = String.valueOf( @@ -98,7 +99,8 @@ public class TestInstanceManager { // test continue profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), - helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime()); + helper.getTestRootDir() + "/2023092710_1.txt", taskProfile.getCycleUnit(), "2023092710", + AgentUtils.getCurrentTime()); action = new InstanceAction(); action.setActionType(ActionType.ADD); action.setProfile(profile); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java index 8f7d2d9d80..fc989b3bcf 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java @@ -17,7 +17,6 @@ package org.apache.inlong.agent.plugin.task.filecollect; -import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator; import org.apache.inlong.agent.plugin.utils.file.Files; @@ -58,19 +57,19 @@ public class FileScanner { private static final Logger logger = LoggerFactory.getLogger(FileScanner.class); - public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, String originPattern, long startTime, + public static List<BasicFileInfo> scanTaskBetweenTimes(String originPattern, String cycleUnit, String timeOffset, + long startTime, long endTime, boolean isRetry) { - String cycleUnit = conf.getCycleUnit(); if (!isRetry) { - startTime += NewDateUtils.calcOffset(conf.getTimeOffset()); - endTime += NewDateUtils.calcOffset(conf.getTimeOffset()); + startTime += NewDateUtils.calcOffset(timeOffset); + endTime += NewDateUtils.calcOffset(timeOffset); } String strStartTime = DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit); String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, cycleUnit); - logger.info("task {} this scan time is between {} and {}.", - new Object[]{conf.getTaskId(), strStartTime, strEndTime}); + logger.info("{} scan time is between {} and {}", + new Object[]{originPattern, strStartTime, strEndTime}); - return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, strStartTime, strEndTime); + return scanTaskBetweenTimes(cycleUnit, originPattern, strStartTime, strEndTime); } /* Scan log files and create tasks between two times. */ @@ -91,8 +90,7 @@ public class FileScanner { // TODO the time is not YYYYMMDDHH String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit); BasicFileInfo info = new BasicFileInfo(file, dataTime); - logger.info("scan new task fileName {} ,dataTime {}", file, - DateTransUtils.millSecConvertToTimeStr(time, cycleUnit)); + logger.info("scan new task fileName {} ,dataTime {}", file, dataTime); infos.add(info); } } @@ -114,11 +112,9 @@ public class FileScanner { String fileName, long depth, int maxFileNum) { ArrayList<String> ret = new ArrayList<String>(); ArrayList<File> readyFiles = new ArrayList<File>(); - if (!new File(firstDir).isDirectory()) { return ret; } - for (File pathname : Files.find(firstDir).yieldFilesAndDirectories() .recursive().withDepth((int) depth).withDirNameRegex(secondDir) .withFileNameRegex(fileName)) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index dc183881c1..9dc7d26c11 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.task.filecollect; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.instance.ActionType; import org.apache.inlong.agent.core.instance.InstanceAction; @@ -85,6 +86,7 @@ public class LogFileCollectTask extends Task { private boolean retry; private long startTime; private long endTime; + private boolean isRealTime = false; private boolean initOK = false; private Set<String> originPatterns; private long lastScanTime = 0; @@ -96,7 +98,7 @@ public class LogFileCollectTask extends Task { @Override public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) throws IOException { if (!isProfileValid(taskProfile)) { - LOGGER.error("task profile invalid {}", taskProfile); + LOGGER.error("task profile invalid {}", taskProfile.toJsonStr()); return; } taskManager = (TaskManager) srcManager; @@ -115,12 +117,15 @@ public class LogFileCollectTask extends Task { retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false); originPatterns = Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(",")) .collect(Collectors.toSet()); + if (taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { + isRealTime = true; + } instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM), basicDb); try { instanceManager.start(); } catch (Exception e) { - LOGGER.error("start instance manager error {}", e.getMessage()); + LOGGER.error("start instance manager error: ", e); } } @@ -131,12 +136,16 @@ public class LogFileCollectTask extends Task { } boolean ret = profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS) - && profile.hasKey(TaskConstants.TASK_FILE_TIME_OFFSET) && profile.hasKey(TaskConstants.FILE_MAX_NUM); if (!ret) { LOGGER.error("task profile needs file keys"); return false; } + if (profile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) != 0 && !profile.hasKey( + TaskConstants.TASK_FILE_TIME_OFFSET)) { + LOGGER.error("task profile needs time offset"); + return false; + } if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) { long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0); long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0); @@ -176,8 +185,11 @@ public class LogFileCollectTask extends Task { * linux regular expression, we have to replace * to ., and replace . with \\. . */ WatchService watchService = FileSystems.getDefault().newWatchService(); - WatchEntity entity = new WatchEntity(watchService, originPattern, taskProfile.getCycleUnit(), - taskProfile.getTimeOffset()); + String timeOffset = ""; + if (!isRealTime) { + timeOffset = taskProfile.getTimeOffset(); + } + WatchEntity entity = new WatchEntity(watchService, originPattern, taskProfile.getCycleUnit(), timeOffset); entity.registerRecursively(); watchers.put(originPattern, entity); watchFailedDirs.remove(originPattern); @@ -187,6 +199,8 @@ public class LogFileCollectTask extends Task { } else { LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e); } + } catch (Exception e) { + LOGGER.error("addPathPattern:", e); } } @@ -311,7 +325,13 @@ public class LogFileCollectTask extends Task { startScanTime = currentTime + offset; endScanTime = currentTime; } - return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern, startScanTime, endScanTime, retry); + if (isRealTime) { + return FileScanner.scanTaskBetweenTimes(originPattern, CycleUnitType.HOUR, taskProfile.getTimeOffset(), + startScanTime, endScanTime, retry); + } else { + return FileScanner.scanTaskBetweenTimes(originPattern, taskProfile.getCycleUnit(), + taskProfile.getTimeOffset(), startScanTime, endScanTime, retry); + } } private void runForWatching() { @@ -336,17 +356,7 @@ public class LogFileCollectTask extends Task { if (sameDataTimeEvents.isEmpty()) { continue; } - /* - * Calculate whether the event needs to be processed at the current time based on its data time, business - * cycle, and offset - */ - String dataTime = entry.getKey(); - String shouldStartTime = - NewDateUtils.getShouldStartTime(dataTime, taskProfile.getCycleUnit(), taskProfile.getTimeOffset()); - String currentTime = getCurrentTime(); - if (currentTime.compareTo(shouldStartTime) >= 0) { - LOGGER.info("submit now taskId {}, dataTime {}, currentTime {}, shouldStartTime {}", - new Object[]{getTaskId(), dataTime, currentTime, shouldStartTime}); + if (isRealTime || shouldStartNow(entry.getKey())) { /* These codes will sort the FileCreationEvents by create time. */ Set<InstanceProfile> sortedEvents = new TreeSet<>(sameDataTimeEvents.values()); /* Check the file end with event creation time in asc order. */ @@ -360,15 +370,23 @@ public class LogFileCollectTask extends Task { } sameDataTimeEvents.remove(fileName); } - } else { - LOGGER.info("submit later taskId {}, dataTime {}, currentTime {}, shouldStartTime {}", - new Object[]{getTaskId(), dataTime, currentTime, shouldStartTime}); } } } + /* + * Calculate whether the event needs to be processed at the current time based on its data time, business cycle, and + * offset + */ + private boolean shouldStartNow(String dataTime) { + String shouldStartTime = + NewDateUtils.getShouldStartTime(dataTime, taskProfile.getCycleUnit(), taskProfile.getTimeOffset()); + String currentTime = getCurrentTime(); + return currentTime.compareTo(shouldStartTime) >= 0; + } + private void removeTimeoutEven(Map<String, Map<String, InstanceProfile>> eventMap, boolean isRetry) { - if (isRetry) { + if (isRetry || isRealTime) { return; } for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) { @@ -484,8 +502,14 @@ public class LogFileCollectTask extends Task { LOGGER.error("should not happen! may be {} has been deleted and add again", fileName); return; } + String cycleUnit = ""; + if (isRealTime) { + cycleUnit = CycleUnitType.HOUR; + } else { + cycleUnit = taskProfile.getCycleUnit(); + } InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE, - fileName, dataTime, fileUpdateTime); + fileName, cycleUnit, dataTime, fileUpdateTime); sameDataTimeEvents.put(fileName, instanceProfile); LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", taskProfile.getTaskId(), dataTime, fileName); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java index 2280b2db5a..ae57acbca8 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java @@ -78,9 +78,6 @@ public class DateUtils { ret = oneMatch; } } - if (ret.isEmpty()) { - throw new IllegalArgumentException("time pattern " + " not find in " + src); - } return ret; } @@ -91,6 +88,9 @@ public class DateUtils { } String longestPattern = extractLongestTimeRegex(src); + if (longestPattern.isEmpty()) { + return new PathDateExpression(longestPattern, NonRegexPatternPosition.NONE); + } String regexSign = "\\^$*+?{(|[)]"; String range = "+?*{"; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index 62207acc81..706167788f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -242,6 +242,9 @@ public class NewDateUtils { * @return */ public static long calcOffset(String timeOffset) { + if (timeOffset.length() == 0) { + return 0; + } String offsetUnit = timeOffset.substring(timeOffset.length() - 1); int startIndex; int symbol; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 0fae339400..4e8ce6b413 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -72,7 +72,7 @@ public class TestSenderManager { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); profile = taskProfile.createInstanceProfile("", fileName, - "20230927", AgentUtils.getCurrentTime()); + taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); } @AfterClass diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index b8e6d60fef..3397f9f58a 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -61,7 +61,7 @@ public class TestLogFileSource { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); instanceProfile = taskProfile.createInstanceProfile("", - fileName, "20230928", AgentUtils.getCurrentTime()); + fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime()); } private LogFileSource getSource() { @@ -110,7 +110,7 @@ public class TestLogFileSource { msg = source.read(); cnt++; } - await().atMost(6, TimeUnit.SECONDS).until(() -> source.sourceFinish()); + await().atMost(30, TimeUnit.SECONDS).until(() -> source.sourceFinish()); source.destroy(); Assert.assertTrue(cnt == 3); Assert.assertTrue(srcLen == readLen); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java index 877f360ca8..89aa196ac2 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java @@ -61,7 +61,8 @@ public class TestMqttConnect { @Override public void run() { - reader.init(jobProfile.createInstanceProfile("", "", "", AgentUtils.getCurrentTime())); + reader.init(jobProfile.createInstanceProfile("", "", jobProfile.getCycleUnit(), "", + AgentUtils.getCurrentTime())); while (!reader.isFinished()) { Message message = reader.read(); if (Objects.nonNull(message)) { diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java index 81d2433349..46860ae20a 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java @@ -51,6 +51,7 @@ public class TestUtils { Assert.assertTrue(NewDateUtils.calcOffset("0") == 0); Assert.assertTrue(NewDateUtils.calcOffset("1") == 0); Assert.assertTrue(NewDateUtils.calcOffset("10") == 0); + Assert.assertTrue(NewDateUtils.calcOffset("") == 0); } public static String getTestTriggerProfile() {