This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 7954b7f5c6 [INLONG-9200][Agent] Fix bug: duplicate file collect instance (#9201) 7954b7f5c6 is described below commit 7954b7f5c642762b84f71e610e2d9be463a9a4e9 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Fri Nov 3 15:05:48 2023 +0800 [INLONG-9200][Agent] Fix bug: duplicate file collect instance (#9201) * [INLONG-9200][Agent] Fix bug: duplicate file collect instance * [INLONG-9200][Agent] Fix bug: duplicate file collect instance * [INLONG-9200][Agent] Fix bug: duplicate file collect instance --- .../apache/inlong/agent/conf/InstanceProfile.java | 10 ++++- .../org/apache/inlong/agent/conf/TaskProfile.java | 4 +- .../inlong/agent/constant/TaskConstants.java | 2 + .../org/apache/inlong/agent/db/InstanceDb.java | 5 +-- .../org/apache/inlong/agent/db/TaskProfileDb.java | 5 +-- .../agent/core/instance/InstanceManager.java | 15 ++++++-- .../inlong/agent/core/task/file/MemoryManager.java | 24 ++++++++---- .../inlong/agent/core/task/file/TaskManager.java | 4 +- .../agent/core/instance/TestInstanceManager.java | 7 +++- .../agent/plugin/sinks/filecollect/ProxySink.java | 3 +- .../inlong/agent/plugin/sources/LogFileSource.java | 3 +- .../task/filecollect/LogFileCollectTask.java | 44 ++++++++++++++++------ .../sinks/filecollect/TestSenderManager.java | 2 +- .../agent/plugin/sources/TestLogFileSource.java | 3 +- .../agent/plugin/sources/TestMqttConnect.java | 3 +- 15 files changed, 91 insertions(+), 43 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java index 024b7674eb..5592008085 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java @@ -93,9 +93,17 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable setInt(INSTANCE_STATE, state.ordinal()); } + public long getFileUpdateTime() { + return getLong(TaskConstants.FILE_UPDATE_TIME, 0); + } + + public void setFileUpdateTime(long lastUpdateTime) { + setLong(TaskConstants.FILE_UPDATE_TIME, lastUpdateTime); + } + @Override public boolean allRequiredKeyExist() { - return true; + return hasKey(TaskConstants.FILE_UPDATE_TIME); } /** diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java index 1040afa88a..319f1abf56 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -106,7 +106,8 @@ public class TaskProfile extends AbstractConfiguration { return GSON.toJson(getConfigStorage()); } - public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String dataTime) { + public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String dataTime, + long fileUpdateTime) { InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr()); instanceProfile.setInstanceClass(instanceClass); instanceProfile.setInstanceId(fileName); @@ -114,6 +115,7 @@ public class TaskProfile extends AbstractConfiguration { instanceProfile.setCreateTime(AgentUtils.getCurrentTime()); instanceProfile.setModifyTime(AgentUtils.getCurrentTime()); instanceProfile.setState(InstanceStateEnum.DEFAULT); + instanceProfile.setFileUpdateTime(fileUpdateTime); return instanceProfile; } } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index bbf95cea76..fa2ac856fd 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -166,6 +166,8 @@ public class TaskConstants extends CommonConstants { public static final String INSTANCE_STATE = "instance.state"; + public static final String FILE_UPDATE_TIME = "fileUpdateTime"; + public static final String LAST_UPDATE_TIME = "lastUpdateTime"; public static final String TRIGGER_ONLY_ONE_JOB = "job.standalone"; // TODO:delete it diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java index dfc8a17055..db41243fbf 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java @@ -65,10 +65,7 @@ public class InstanceDb { instance.get(TaskConstants.INSTANCE_ID)); KeyValueEntity entity = new KeyValueEntity(keyName, instance.toJsonStr(), instance.get(TaskConstants.INSTANCE_ID)); - KeyValueEntity oldEntity = db.put(entity); - if (oldEntity != null) { - LOGGER.warn("instance profile {} has been replaced", oldEntity.getKey()); - } + db.put(entity); } else { LOGGER.error("instance profile invalid!"); } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java index b524bb09db..d37c3f9b10 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java @@ -64,10 +64,7 @@ public class TaskProfileDb { String keyName = getKeyByTaskId(task.getTaskId()); KeyValueEntity entity = new KeyValueEntity(keyName, task.toJsonStr(), task.get(TaskConstants.FILE_DIR_FILTER_PATTERNS)); - KeyValueEntity oldEntity = db.put(entity); - if (oldEntity != null) { - LOGGER.warn("task profile {} has been replaced", oldEntity.getKey()); - } + db.put(entity); } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index a6c8381de3..e209535db8 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit; public class InstanceManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceManager.class); - private static final int ACTION_QUEUE_CAPACITY = 100000; + private static final int ACTION_QUEUE_CAPACITY = 100; public static final int CORE_THREAD_SLEEP_TIME = 100; // task in db private final InstanceDb instanceDb; @@ -236,7 +236,11 @@ public class InstanceManager extends AbstractDaemon { } private void addInstance(InstanceProfile profile) { - LOGGER.info("addInstance taskId {} instanceId {}", taskId, profile.getInstanceId()); + LOGGER.info("add instance taskId {} instanceId {}", taskId, profile.getInstanceId()); + if (!shouldAddAgain(profile.getInstanceId(), profile.getFileUpdateTime())) { + LOGGER.info("shouldAddAgain returns false skip taskId {} instanceId {}", taskId, profile.getInstanceId()); + return; + } addToDb(profile); addToMemory(profile); } @@ -274,7 +278,7 @@ public class InstanceManager extends AbstractDaemon { } private void addToDb(InstanceProfile profile) { - LOGGER.info("add instance to db instanceId {} ", profile.getInstanceId()); + LOGGER.info("add instance to db state {} instanceId {}", profile.getState(), profile.getInstanceId()); instanceDb.storeInstance(profile); } @@ -287,7 +291,7 @@ public class InstanceManager extends AbstractDaemon { oldInstance.destroy(); instanceMap.remove(instanceProfile.getInstanceId()); LOGGER.error("old instance {} should not exist, try stop it first", - instanceProfile); + instanceProfile.getInstanceId()); } LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr()); try { @@ -315,13 +319,16 @@ public class InstanceManager extends AbstractDaemon { public boolean shouldAddAgain(String fileName, long lastModifyTime) { InstanceProfile profileFromDb = instanceDb.getInstance(taskId, fileName); if (profileFromDb == null) { + LOGGER.debug("not in db should add {}", fileName); return true; } else { InstanceStateEnum state = profileFromDb.getState(); if (state == InstanceStateEnum.FINISHED && lastModifyTime > profileFromDb.getModifyTime()) { + LOGGER.debug("finished but file update again {}", fileName); return true; } if (state == InstanceStateEnum.DELETE) { + LOGGER.debug("delete and add again {}", fileName); return true; } return false; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java index fca9d37c72..a3bfc415e2 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.core.task.file; import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,8 @@ public class MemoryManager { private static volatile MemoryManager memoryManager = null; private final AgentConfiguration conf; private ConcurrentHashMap<String, Semaphore> semaphoreMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Long> lastPrintTime = new ConcurrentHashMap<>(); + private static final int PRINT_INTERVAL_MS = 1000; private MemoryManager() { this.conf = AgentConfiguration.getAgentConf(); @@ -48,14 +51,17 @@ public class MemoryManager { semaphore = new Semaphore( conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT)); semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore); + lastPrintTime.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, 0L); semaphore = new Semaphore( conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT)); semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore); + lastPrintTime.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, 0L); semaphore = new Semaphore( conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT)); semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore); + lastPrintTime.put(AGENT_GLOBAL_WRITER_PERMIT, 0L); } /** @@ -99,19 +105,23 @@ public class MemoryManager { return semaphore.availablePermits(); } - public void printDetail(String semaphoreName) { + public void printDetail(String semaphoreName, String detail) { Semaphore semaphore = semaphoreMap.get(semaphoreName); if (semaphore == null) { - LOGGER.error("printDetail {} not exist"); + LOGGER.error("printDetail {} not exist", semaphoreName); return; } - LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), semaphore.getQueueLength(), - semaphoreName); + if (AgentUtils.getCurrentTime() - lastPrintTime.get(semaphoreName) > PRINT_INTERVAL_MS) { + LOGGER.info("{} permit left {} wait {} {}", detail, semaphore.availablePermits(), + semaphore.getQueueLength(), + semaphoreName); + lastPrintTime.put(semaphoreName, AgentUtils.getCurrentTime()); + } } public void printAll() { - printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT); - printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT); - printDetail(AGENT_GLOBAL_WRITER_PERMIT); + printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT, "printAll"); + printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT, "printAll"); + printDetail(AGENT_GLOBAL_WRITER_PERMIT, "printAll"); } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index 991be20c05..9aa71a96ab 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -369,7 +369,7 @@ public class TaskManager extends AbstractDaemon { */ private void addToDb(TaskProfile taskProfile) { if (taskDb.getTask(taskProfile.getTaskId()) != null) { - LOGGER.error("task {} should not exist", taskProfile); + LOGGER.error("task {} should not exist", taskProfile.getTaskId()); } taskDb.storeTask(taskProfile); } @@ -398,7 +398,7 @@ public class TaskManager extends AbstractDaemon { oldTask.destroy(); taskMap.remove(taskProfile.getTaskId()); LOGGER.error("old task {} should not exist, try stop it first", - taskProfile); + taskProfile.getTaskId()); } try { Class<?> taskClass = Class.forName(taskProfile.getTaskClass()); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index 0745c13c72..e858743cb3 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -64,7 +64,7 @@ public class TestInstanceManager { public void testInstanceManager() { long timeBefore = AgentUtils.getCurrentTime(); InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), - helper.getTestRootDir() + "/20230927_1.txt", "20230927"); + helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime()); String instanceId = profile.getInstanceId(); InstanceAction action = new InstanceAction(); action.setActionType(ActionType.ADD); @@ -85,8 +85,11 @@ public class TestInstanceManager { Assert.assertTrue(manager.shouldAddAgain(profile.getInstanceId(), AgentUtils.getCurrentTime())); // test continue + profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), + helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime()); + action = new InstanceAction(); action.setActionType(ActionType.ADD); - profile.setState(InstanceStateEnum.DEFAULT); + action.setProfile(profile); manager.submitAction(action); await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstance(instanceId) != null); Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == InstanceStateEnum.DEFAULT); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java index 4657cd4bfc..30bbf2d956 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java @@ -96,8 +96,7 @@ public class ProxySink extends AbstractSink { boolean writerPermitSuc = MemoryManager.getInstance() .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); if (!writerPermitSuc) { - LOGGER.warn("writer tryAcquire failed"); - MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT); + MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT, "proxy sink"); return false; } cache.generateExtraMap(proxyMessage.getDataKey()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index ea0e63c95f..1e187e2bf4 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -375,8 +375,7 @@ public class LogFileSource extends AbstractSource { while (!suc) { suc = MemoryManager.getInstance().tryAcquire(permitName, permitLen); if (!suc) { - LOGGER.warn("get permit {} failed", permitName); - MemoryManager.getInstance().printDetail(permitName); + MemoryManager.getInstance().printDetail(permitName, "log file source"); if (!isRunnable()) { return false; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 26007d0290..11d9330274 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -273,6 +273,17 @@ public class LogFileCollectTask extends Task { }); } + private boolean isInEventMap(String fileName, String dataTime) { + Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime); + if (fileToProfile == null) { + return false; + } + if (fileToProfile.get(fileName) == null) { + return false; + } + return true; + } + private List<BasicFileInfo> scanExistingFileByPattern(String originPattern) { long startScanTime = startTime; long endScanTime = endTime; @@ -305,14 +316,20 @@ public class LogFileCollectTask extends Task { removeTimeoutEven(eventMap, retry); for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) { Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue(); - // 根据event的数据时间、业务的周期、偏移量计算出该event是否需要在当前时间处理 + if (sameDataTimeEvents.isEmpty()) { + return; + } + /* + * Calculate whether the event needs to be processed at the current time based on its data time, business + * cycle, and offset + */ String dataTime = entry.getKey(); String shouldStartTime = NewDateUtils.getShouldStartTime(dataTime, taskProfile.getCycleUnit(), taskProfile.getTimeOffset()); String currentTime = getCurrentTime(); - LOGGER.info("taskId {}, dataTime {}, currentTime {}, shouldStartTime {}", - new Object[]{getTaskId(), dataTime, currentTime, shouldStartTime}); if (currentTime.compareTo(shouldStartTime) >= 0) { + LOGGER.info("submit now taskId {}, dataTime {}, currentTime {}, shouldStartTime {}", + new Object[]{getTaskId(), dataTime, currentTime, shouldStartTime}); /* These codes will sort the FileCreationEvents by create time. */ Set<InstanceProfile> sortedEvents = new TreeSet<>(sameDataTimeEvents.values()); /* Check the file end with event creation time in asc order. */ @@ -326,6 +343,9 @@ public class LogFileCollectTask extends Task { } sameDataTimeEvents.remove(fileName); } + } else { + LOGGER.info("submit later taskId {}, dataTime {}, currentTime {}, shouldStartTime {}", + new Object[]{getTaskId(), dataTime, currentTime, shouldStartTime}); } } } @@ -335,7 +355,7 @@ public class LogFileCollectTask extends Task { return; } for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) { - // 如果event的数据时间在当前时间前(后)2天之内,则有效 + /* If the data time of the event is within 2 days before (after) the current time, it is valid */ String dataTime = entry.getKey(); if (!NewDateUtils.isValidCreationTime(dataTime, DAY_TIMEOUT_INTERVAL)) { /* Remove it from memory map. */ @@ -429,9 +449,11 @@ public class LogFileCollectTask extends Task { } private void addToEvenMap(String fileName, String dataTime) { - Long lastModifyTime = FileUtils.getFileLastModifyTime(fileName); - if (!instanceManager.shouldAddAgain(fileName, lastModifyTime)) { - LOGGER.info("file {} has record in db", fileName); + if (isInEventMap(fileName, dataTime)) { + return; + } + Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName); + if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) { return; } Map<String, InstanceProfile> sameDataTimeEvents = eventMap.computeIfAbsent(dataTime, @@ -442,7 +464,7 @@ public class LogFileCollectTask extends Task { return; } InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE, - fileName, dataTime); + fileName, dataTime, fileUpdateTime); sameDataTimeEvents.put(fileName, instanceProfile); } @@ -467,12 +489,12 @@ public class LogFileCollectTask extends Task { * For this case, we can simple think that the next file creation means the last task of this conf should finish * reading and start reading this new file. */ - // 从文件名称中提取数据时间 + // Extract data time from file name String fileTime = NewDateUtils.getDateTime(fileName, originPattern, dateExpression); /** - * 将文件时间中任意非数字字符替换掉 - * 如2015-09-16_00替换成2015091600 + * Replace any non-numeric characters in the file time + * such as 2015-09-16_00 replace with 2015091600 */ return fileTime.replaceAll("\\D", ""); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 084af58851..bf96afac74 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -74,7 +74,7 @@ public class TestSenderManager { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); profile = taskProfile.createInstanceProfile("", fileName, - "20230927"); + "20230927", AgentUtils.getCurrentTime()); } @AfterClass diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 23b5027812..5617976077 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -24,6 +24,7 @@ import org.apache.inlong.agent.core.task.file.MemoryManager; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; +import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.enums.TaskStateEnum; import com.google.gson.Gson; @@ -60,7 +61,7 @@ public class TestLogFileSource { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); instanceProfile = taskProfile.createInstanceProfile("", - fileName, "20230928"); + fileName, "20230928", AgentUtils.getCurrentTime()); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java index 73f999230d..877f360ca8 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java @@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.reader.MqttReader; +import org.apache.inlong.agent.utils.AgentUtils; import org.junit.Ignore; import org.slf4j.Logger; @@ -60,7 +61,7 @@ public class TestMqttConnect { @Override public void run() { - reader.init(jobProfile.createInstanceProfile("", "", "")); + reader.init(jobProfile.createInstanceProfile("", "", "", AgentUtils.getCurrentTime())); while (!reader.isFinished()) { Message message = reader.read(); if (Objects.nonNull(message)) {