This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7954b7f5c6 [INLONG-9200][Agent] Fix bug: duplicate file collect 
instance (#9201)
7954b7f5c6 is described below

commit 7954b7f5c642762b84f71e610e2d9be463a9a4e9
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Nov 3 15:05:48 2023 +0800

    [INLONG-9200][Agent] Fix bug: duplicate file collect instance (#9201)
    
    * [INLONG-9200][Agent] Fix bug: duplicate file collect instance
    
    * [INLONG-9200][Agent] Fix bug: duplicate file collect instance
    
    * [INLONG-9200][Agent] Fix bug: duplicate file collect instance
---
 .../apache/inlong/agent/conf/InstanceProfile.java  | 10 ++++-
 .../org/apache/inlong/agent/conf/TaskProfile.java  |  4 +-
 .../inlong/agent/constant/TaskConstants.java       |  2 +
 .../org/apache/inlong/agent/db/InstanceDb.java     |  5 +--
 .../org/apache/inlong/agent/db/TaskProfileDb.java  |  5 +--
 .../agent/core/instance/InstanceManager.java       | 15 ++++++--
 .../inlong/agent/core/task/file/MemoryManager.java | 24 ++++++++----
 .../inlong/agent/core/task/file/TaskManager.java   |  4 +-
 .../agent/core/instance/TestInstanceManager.java   |  7 +++-
 .../agent/plugin/sinks/filecollect/ProxySink.java  |  3 +-
 .../inlong/agent/plugin/sources/LogFileSource.java |  3 +-
 .../task/filecollect/LogFileCollectTask.java       | 44 ++++++++++++++++------
 .../sinks/filecollect/TestSenderManager.java       |  2 +-
 .../agent/plugin/sources/TestLogFileSource.java    |  3 +-
 .../agent/plugin/sources/TestMqttConnect.java      |  3 +-
 15 files changed, 91 insertions(+), 43 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 024b7674eb..5592008085 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -93,9 +93,17 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
         setInt(INSTANCE_STATE, state.ordinal());
     }
 
+    public long getFileUpdateTime() {
+        return getLong(TaskConstants.FILE_UPDATE_TIME, 0);
+    }
+
+    public void setFileUpdateTime(long lastUpdateTime) {
+        setLong(TaskConstants.FILE_UPDATE_TIME, lastUpdateTime);
+    }
+
     @Override
     public boolean allRequiredKeyExist() {
-        return true;
+        return hasKey(TaskConstants.FILE_UPDATE_TIME);
     }
 
     /**
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 1040afa88a..319f1abf56 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -106,7 +106,8 @@ public class TaskProfile extends AbstractConfiguration {
         return GSON.toJson(getConfigStorage());
     }
 
-    public InstanceProfile createInstanceProfile(String instanceClass, String 
fileName, String dataTime) {
+    public InstanceProfile createInstanceProfile(String instanceClass, String 
fileName, String dataTime,
+            long fileUpdateTime) {
         InstanceProfile instanceProfile = 
InstanceProfile.parseJsonStr(toJsonStr());
         instanceProfile.setInstanceClass(instanceClass);
         instanceProfile.setInstanceId(fileName);
@@ -114,6 +115,7 @@ public class TaskProfile extends AbstractConfiguration {
         instanceProfile.setCreateTime(AgentUtils.getCurrentTime());
         instanceProfile.setModifyTime(AgentUtils.getCurrentTime());
         instanceProfile.setState(InstanceStateEnum.DEFAULT);
+        instanceProfile.setFileUpdateTime(fileUpdateTime);
         return instanceProfile;
     }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index bbf95cea76..fa2ac856fd 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -166,6 +166,8 @@ public class TaskConstants extends CommonConstants {
 
     public static final String INSTANCE_STATE = "instance.state";
 
+    public static final String FILE_UPDATE_TIME = "fileUpdateTime";
+
     public static final String LAST_UPDATE_TIME = "lastUpdateTime";
 
     public static final String TRIGGER_ONLY_ONE_JOB = "job.standalone"; // 
TODO:delete it
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
index dfc8a17055..db41243fbf 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
@@ -65,10 +65,7 @@ public class InstanceDb {
                     instance.get(TaskConstants.INSTANCE_ID));
             KeyValueEntity entity = new KeyValueEntity(keyName,
                     instance.toJsonStr(), 
instance.get(TaskConstants.INSTANCE_ID));
-            KeyValueEntity oldEntity = db.put(entity);
-            if (oldEntity != null) {
-                LOGGER.warn("instance profile {} has been replaced", 
oldEntity.getKey());
-            }
+            db.put(entity);
         } else {
             LOGGER.error("instance profile invalid!");
         }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
index b524bb09db..d37c3f9b10 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
@@ -64,10 +64,7 @@ public class TaskProfileDb {
             String keyName = getKeyByTaskId(task.getTaskId());
             KeyValueEntity entity = new KeyValueEntity(keyName,
                     task.toJsonStr(), 
task.get(TaskConstants.FILE_DIR_FILTER_PATTERNS));
-            KeyValueEntity oldEntity = db.put(entity);
-            if (oldEntity != null) {
-                LOGGER.warn("task profile {} has been replaced", 
oldEntity.getKey());
-            }
+            db.put(entity);
         }
     }
 
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index a6c8381de3..e209535db8 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
 public class InstanceManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceManager.class);
-    private static final int ACTION_QUEUE_CAPACITY = 100000;
+    private static final int ACTION_QUEUE_CAPACITY = 100;
     public static final int CORE_THREAD_SLEEP_TIME = 100;
     // task in db
     private final InstanceDb instanceDb;
@@ -236,7 +236,11 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void addInstance(InstanceProfile profile) {
-        LOGGER.info("addInstance taskId {} instanceId {}", taskId, 
profile.getInstanceId());
+        LOGGER.info("add instance taskId {} instanceId {}", taskId, 
profile.getInstanceId());
+        if (!shouldAddAgain(profile.getInstanceId(), 
profile.getFileUpdateTime())) {
+            LOGGER.info("shouldAddAgain returns false skip taskId {} 
instanceId {}", taskId, profile.getInstanceId());
+            return;
+        }
         addToDb(profile);
         addToMemory(profile);
     }
@@ -274,7 +278,7 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void addToDb(InstanceProfile profile) {
-        LOGGER.info("add instance to db instanceId {} ", 
profile.getInstanceId());
+        LOGGER.info("add instance to db state {} instanceId {}", 
profile.getState(), profile.getInstanceId());
         instanceDb.storeInstance(profile);
     }
 
@@ -287,7 +291,7 @@ public class InstanceManager extends AbstractDaemon {
             oldInstance.destroy();
             instanceMap.remove(instanceProfile.getInstanceId());
             LOGGER.error("old instance {} should not exist, try stop it first",
-                    instanceProfile);
+                    instanceProfile.getInstanceId());
         }
         LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
         try {
@@ -315,13 +319,16 @@ public class InstanceManager extends AbstractDaemon {
     public boolean shouldAddAgain(String fileName, long lastModifyTime) {
         InstanceProfile profileFromDb = instanceDb.getInstance(taskId, 
fileName);
         if (profileFromDb == null) {
+            LOGGER.debug("not in db should add {}", fileName);
             return true;
         } else {
             InstanceStateEnum state = profileFromDb.getState();
             if (state == InstanceStateEnum.FINISHED && lastModifyTime > 
profileFromDb.getModifyTime()) {
+                LOGGER.debug("finished but file update again {}", fileName);
                 return true;
             }
             if (state == InstanceStateEnum.DELETE) {
+                LOGGER.debug("delete and add again {}", fileName);
                 return true;
             }
             return false;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
index fca9d37c72..a3bfc415e2 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.core.task.file;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +42,8 @@ public class MemoryManager {
     private static volatile MemoryManager memoryManager = null;
     private final AgentConfiguration conf;
     private ConcurrentHashMap<String, Semaphore> semaphoreMap = new 
ConcurrentHashMap<>();
+    private ConcurrentHashMap<String, Long> lastPrintTime = new 
ConcurrentHashMap<>();
+    private static final int PRINT_INTERVAL_MS = 1000;
 
     private MemoryManager() {
         this.conf = AgentConfiguration.getAgentConf();
@@ -48,14 +51,17 @@ public class MemoryManager {
         semaphore = new Semaphore(
                 conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
         semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
+        lastPrintTime.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, 0L);
 
         semaphore = new Semaphore(
                 conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
         semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
+        lastPrintTime.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, 0L);
 
         semaphore = new Semaphore(
                 conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, 
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
         semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
+        lastPrintTime.put(AGENT_GLOBAL_WRITER_PERMIT, 0L);
     }
 
     /**
@@ -99,19 +105,23 @@ public class MemoryManager {
         return semaphore.availablePermits();
     }
 
-    public void printDetail(String semaphoreName) {
+    public void printDetail(String semaphoreName, String detail) {
         Semaphore semaphore = semaphoreMap.get(semaphoreName);
         if (semaphore == null) {
-            LOGGER.error("printDetail {} not exist");
+            LOGGER.error("printDetail {} not exist", semaphoreName);
             return;
         }
-        LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), 
semaphore.getQueueLength(),
-                semaphoreName);
+        if (AgentUtils.getCurrentTime() - lastPrintTime.get(semaphoreName) > 
PRINT_INTERVAL_MS) {
+            LOGGER.info("{} permit left {} wait {} {}", detail, 
semaphore.availablePermits(),
+                    semaphore.getQueueLength(),
+                    semaphoreName);
+            lastPrintTime.put(semaphoreName, AgentUtils.getCurrentTime());
+        }
     }
 
     public void printAll() {
-        printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
-        printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
-        printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+        printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT, "printAll");
+        printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT, "printAll");
+        printDetail(AGENT_GLOBAL_WRITER_PERMIT, "printAll");
     }
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 991be20c05..9aa71a96ab 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -369,7 +369,7 @@ public class TaskManager extends AbstractDaemon {
      */
     private void addToDb(TaskProfile taskProfile) {
         if (taskDb.getTask(taskProfile.getTaskId()) != null) {
-            LOGGER.error("task {} should not exist", taskProfile);
+            LOGGER.error("task {} should not exist", taskProfile.getTaskId());
         }
         taskDb.storeTask(taskProfile);
     }
@@ -398,7 +398,7 @@ public class TaskManager extends AbstractDaemon {
             oldTask.destroy();
             taskMap.remove(taskProfile.getTaskId());
             LOGGER.error("old task {} should not exist, try stop it first",
-                    taskProfile);
+                    taskProfile.getTaskId());
         }
         try {
             Class<?> taskClass = Class.forName(taskProfile.getTaskClass());
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 0745c13c72..e858743cb3 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -64,7 +64,7 @@ public class TestInstanceManager {
     public void testInstanceManager() {
         long timeBefore = AgentUtils.getCurrentTime();
         InstanceProfile profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                helper.getTestRootDir() + "/20230927_1.txt", "20230927");
+                helper.getTestRootDir() + "/20230927_1.txt", "20230927", 
AgentUtils.getCurrentTime());
         String instanceId = profile.getInstanceId();
         InstanceAction action = new InstanceAction();
         action.setActionType(ActionType.ADD);
@@ -85,8 +85,11 @@ public class TestInstanceManager {
         Assert.assertTrue(manager.shouldAddAgain(profile.getInstanceId(), 
AgentUtils.getCurrentTime()));
 
         // test continue
+        profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
+                helper.getTestRootDir() + "/20230927_1.txt", "20230927", 
AgentUtils.getCurrentTime());
+        action = new InstanceAction();
         action.setActionType(ActionType.ADD);
-        profile.setState(InstanceStateEnum.DEFAULT);
+        action.setProfile(profile);
         manager.submitAction(action);
         await().atMost(1, TimeUnit.SECONDS).until(() -> 
manager.getInstance(instanceId) != null);
         Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == 
InstanceStateEnum.DEFAULT);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 4657cd4bfc..30bbf2d956 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -96,8 +96,7 @@ public class ProxySink extends AbstractSink {
             boolean writerPermitSuc = MemoryManager.getInstance()
                     .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
             if (!writerPermitSuc) {
-                LOGGER.warn("writer tryAcquire failed");
-                
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+                
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT, "proxy 
sink");
                 return false;
             }
             cache.generateExtraMap(proxyMessage.getDataKey());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index ea0e63c95f..1e187e2bf4 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -375,8 +375,7 @@ public class LogFileSource extends AbstractSource {
         while (!suc) {
             suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
             if (!suc) {
-                LOGGER.warn("get permit {} failed", permitName);
-                MemoryManager.getInstance().printDetail(permitName);
+                MemoryManager.getInstance().printDetail(permitName, "log file 
source");
                 if (!isRunnable()) {
                     return false;
                 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 26007d0290..11d9330274 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -273,6 +273,17 @@ public class LogFileCollectTask extends Task {
         });
     }
 
+    private boolean isInEventMap(String fileName, String dataTime) {
+        Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
+        if (fileToProfile == null) {
+            return false;
+        }
+        if (fileToProfile.get(fileName) == null) {
+            return false;
+        }
+        return true;
+    }
+
     private List<BasicFileInfo> scanExistingFileByPattern(String 
originPattern) {
         long startScanTime = startTime;
         long endScanTime = endTime;
@@ -305,14 +316,20 @@ public class LogFileCollectTask extends Task {
         removeTimeoutEven(eventMap, retry);
         for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
             Map<String, InstanceProfile> sameDataTimeEvents = entry.getValue();
-            // 根据event的数据时间、业务的周期、偏移量计算出该event是否需要在当前时间处理
+            if (sameDataTimeEvents.isEmpty()) {
+                return;
+            }
+            /*
+             * Calculate whether the event needs to be processed at the 
current time based on its data time, business
+             * cycle, and offset
+             */
             String dataTime = entry.getKey();
             String shouldStartTime =
                     NewDateUtils.getShouldStartTime(dataTime, 
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
             String currentTime = getCurrentTime();
-            LOGGER.info("taskId {}, dataTime {}, currentTime {}, 
shouldStartTime {}",
-                    new Object[]{getTaskId(), dataTime, currentTime, 
shouldStartTime});
             if (currentTime.compareTo(shouldStartTime) >= 0) {
+                LOGGER.info("submit now taskId {}, dataTime {}, currentTime 
{}, shouldStartTime {}",
+                        new Object[]{getTaskId(), dataTime, currentTime, 
shouldStartTime});
                 /* These codes will sort the FileCreationEvents by create 
time. */
                 Set<InstanceProfile> sortedEvents = new 
TreeSet<>(sameDataTimeEvents.values());
                 /* Check the file end with event creation time in asc order. */
@@ -326,6 +343,9 @@ public class LogFileCollectTask extends Task {
                     }
                     sameDataTimeEvents.remove(fileName);
                 }
+            } else {
+                LOGGER.info("submit later taskId {}, dataTime {}, currentTime 
{}, shouldStartTime {}",
+                        new Object[]{getTaskId(), dataTime, currentTime, 
shouldStartTime});
             }
         }
     }
@@ -335,7 +355,7 @@ public class LogFileCollectTask extends Task {
             return;
         }
         for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
-            // 如果event的数据时间在当前时间前(后)2天之内,则有效
+            /* If the data time of the event is within 2 days before (after) 
the current time, it is valid */
             String dataTime = entry.getKey();
             if (!NewDateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
                 /* Remove it from memory map. */
@@ -429,9 +449,11 @@ public class LogFileCollectTask extends Task {
     }
 
     private void addToEvenMap(String fileName, String dataTime) {
-        Long lastModifyTime = FileUtils.getFileLastModifyTime(fileName);
-        if (!instanceManager.shouldAddAgain(fileName, lastModifyTime)) {
-            LOGGER.info("file {} has record in db", fileName);
+        if (isInEventMap(fileName, dataTime)) {
+            return;
+        }
+        Long fileUpdateTime = FileUtils.getFileLastModifyTime(fileName);
+        if (!instanceManager.shouldAddAgain(fileName, fileUpdateTime)) {
             return;
         }
         Map<String, InstanceProfile> sameDataTimeEvents = 
eventMap.computeIfAbsent(dataTime,
@@ -442,7 +464,7 @@ public class LogFileCollectTask extends Task {
             return;
         }
         InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
-                fileName, dataTime);
+                fileName, dataTime, fileUpdateTime);
         sameDataTimeEvents.put(fileName, instanceProfile);
     }
 
@@ -467,12 +489,12 @@ public class LogFileCollectTask extends Task {
          * For this case, we can simple think that the next file creation 
means the last task of this conf should finish
          * reading and start reading this new file.
          */
-        // 从文件名称中提取数据时间
+        // Extract data time from file name
         String fileTime = NewDateUtils.getDateTime(fileName, originPattern, 
dateExpression);
 
         /**
-         * 将文件时间中任意非数字字符替换掉
-         * 如2015-09-16_00替换成2015091600
+         * Replace any non-numeric characters in the file time
+         * such as 2015-09-16_00 replace with 2015091600
          */
         return fileTime.replaceAll("\\D", "");
     }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 084af58851..bf96afac74 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -74,7 +74,7 @@ public class TestSenderManager {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
         profile = taskProfile.createInstanceProfile("", fileName,
-                "20230927");
+                "20230927", AgentUtils.getCurrentTime());
     }
 
     @AfterClass
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 23b5027812..5617976077 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.core.task.file.MemoryManager;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
 import com.google.gson.Gson;
@@ -60,7 +61,7 @@ public class TestLogFileSource {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
         instanceProfile = taskProfile.createInstanceProfile("",
-                fileName, "20230928");
+                fileName, "20230928", AgentUtils.getCurrentTime());
 
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
index 73f999230d..877f360ca8 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.MqttReader;
+import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.junit.Ignore;
 import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public class TestMqttConnect {
 
                 @Override
                 public void run() {
-                    reader.init(jobProfile.createInstanceProfile("", "", ""));
+                    reader.init(jobProfile.createInstanceProfile("", "", "", 
AgentUtils.getCurrentTime()));
                     while (!reader.isFinished()) {
                         Message message = reader.read();
                         if (Objects.nonNull(message)) {

Reply via email to