This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f9c28bad17 [INLONG-9244][Agent] Fix bug: miss file from next data time (#9245) f9c28bad17 is described below commit f9c28bad17278d193038ae77a88dba62b1c8a40d Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Nov 9 09:56:25 2023 +0800 [INLONG-9244][Agent] Fix bug: miss file from next data time (#9245) --- .../agent/plugin/task/filecollect/LogFileCollectTask.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 1629b89595..26c61efa7a 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -325,7 +325,7 @@ public class LogFileCollectTask extends Task { for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) { Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue(); if (sameDataTimeEvents.isEmpty()) { - return; + continue; } /* * Calculate whether the event needs to be processed at the current time based on its data time, business @@ -442,10 +442,10 @@ public class LogFileCollectTask extends Task { private void handleFilePath(Path filePath, WatchEntity entity) { String newFileName = filePath.toFile().getAbsolutePath(); - LOGGER.info("[New File] {} {}", newFileName, entity.getOriginPattern()); + LOGGER.info("new file {} {}", newFileName, entity.getOriginPattern()); Matcher matcher = entity.getPattern().matcher(newFileName); if (matcher.matches() || matcher.lookingAt()) { - LOGGER.info("[Matched File] {} {}", newFileName, entity.getOriginPattern()); + LOGGER.info("matched file {} {}", newFileName, entity.getOriginPattern()); String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), entity.getDateExpression()); if (!checkFileNameForTime(newFileName, entity)) { @@ -458,10 +458,14 @@ public class LogFileCollectTask extends Task { private void addToEvenMap(String fileName, String dataTime) { if (isInEventMap(fileName, dataTime)) { + LOGGER.info("addToEvenMap isInEventMap returns true skip taskId {} dataTime {} fileName {}", + taskProfile.getTaskId(), dataTime, fileName); return; } Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName); if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) { + LOGGER.info("addToEvenMap shouldAddAgain returns false skip taskId {} dataTime {} fileName {}", + taskProfile.getTaskId(), dataTime, fileName); return; } Map<String, InstanceProfile> sameDataTimeEvents = eventMap.computeIfAbsent(dataTime, @@ -474,6 +478,7 @@ public class LogFileCollectTask extends Task { InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE, fileName, dataTime, fileUpdateTime); sameDataTimeEvents.put(fileName, instanceProfile); + LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}", taskProfile.getTaskId(), dataTime, fileName); } private boolean checkFileNameForTime(String newFileName, WatchEntity entity) {