This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 82cc76e41a [INLONG-11752][Agent] Modify the default collection range 
of data (#11753)
82cc76e41a is described below

commit 82cc76e41af429b316241d5dafb0e1474bd054cc
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Feb 13 19:53:48 2025 +0800

    [INLONG-11752][Agent] Modify the default collection range of data (#11753)
---
 .../inlong/agent/constant/AgentConstants.java      |  6 +++++
 .../agent/core/instance/InstanceManager.java       | 14 +++++-----
 .../inlong/agent/core/task/OffsetManager.java      | 31 +++++++++++++++++++---
 .../inlong/agent/plugin/sinks/ProxySink.java       | 11 ++++----
 .../SenderManager.java => Sender.java}             |  8 +++---
 .../plugin/task/logcollection/LogAbstractTask.java | 10 ++++---
 .../inlong/agent/plugin/utils/regex/Scanner.java   |  7 +++--
 .../inlong/agent/plugin/sinks/KafkaSinkTest.java   |  6 ++---
 .../inlong/agent/plugin/sinks/PulsarSinkTest.java  |  6 ++---
 .../{TestSenderManager.java => TestSender.java}    | 21 ++++++++-------
 pom.xml                                            |  6 +++++
 11 files changed, 83 insertions(+), 43 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index f7c0f0d7c8..0c73c12852 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -76,6 +76,12 @@ public class AgentConstants {
     public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit";
     public static final boolean DEFAULT_ENABLE_OOM_EXIT = false;
 
+    public static final String AGENT_SCAN_RANGE = "agent.scan.range";
+    public static final String DEFAULT_AGENT_SCAN_RANGE = "-2";
+    public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
+    public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
+    public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";
+
     // pulsar sink config
     public static final String PULSAR_CLIENT_IO_TREHAD_NUM = 
"agent.sink.pulsar.client.io.thread.num";
     public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1,
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 3f0a914e90..23c16cabe8 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -56,7 +56,9 @@ public class InstanceManager extends AbstractDaemon {
     public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
     public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
     public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
+    public static final long KEEP_PACE_INTERVAL_MS = 60 * 1000;
     private long lastPrintTime = 0;
+    private long lastTraverseTime = 0;
     // instance in instance store
     private final InstanceStore instanceStore;
     private TaskStore taskStore;
@@ -67,7 +69,7 @@ public class InstanceManager extends AbstractDaemon {
     private final BlockingQueue<InstanceAction> actionQueue;
     private final BlockingQueue<InstanceAction> addActionQueue;
     // task thread pool;
-    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+    private final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             1L, TimeUnit.SECONDS,
             new SynchronousQueue<>(),
@@ -77,10 +79,8 @@ public class InstanceManager extends AbstractDaemon {
     private final AgentConfiguration agentConf;
     private final String taskId;
     private long auditVersion;
-    private volatile boolean runAtLeastOneTime = false;
     private volatile boolean running = false;
     private final double reserveCoefficient = 0.8;
-    private long finishedInstanceCount = 0;
 
     private class InstancePrintStat {
 
@@ -165,12 +165,16 @@ public class InstanceManager extends AbstractDaemon {
             Thread.currentThread().setName("instance-manager-core-" + taskId);
             running = true;
             while (isRunnable()) {
+                long currentTime = AgentUtils.getCurrentTime();
                 try {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
                     printInstanceState();
                     dealWithActionQueue();
                     dealWithAddActionQueue();
-                    keepPaceWithStore();
+                    if (currentTime - lastTraverseTime > 
KEEP_PACE_INTERVAL_MS) {
+                        keepPaceWithStore();
+                        lastTraverseTime = currentTime;
+                    }
                     String inlongGroupId = taskFromStore.getInlongGroupId();
                     String inlongStreamId = taskFromStore.getInlongStreamId();
                     
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, 
inlongStreamId,
@@ -179,7 +183,6 @@ public class InstanceManager extends AbstractDaemon {
                     LOGGER.error("coreThread error: ", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
                 }
-                runAtLeastOneTime = true;
             }
             running = false;
         };
@@ -356,7 +359,6 @@ public class InstanceManager extends AbstractDaemon {
         deleteFromMemory(profile.getInstanceId());
         LOGGER.info("finished instance state {} taskId {} instanceId {}", 
profile.getState(),
                 profile.getTaskId(), profile.getInstanceId());
-        finishedInstanceCount++;
     }
 
     private void deleteInstance(String instanceId) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 3167b850e1..b82e399c81 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -18,9 +18,11 @@
 package org.apache.inlong.agent.core.task;
 
 import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.store.InstanceStore;
@@ -50,8 +52,8 @@ public class OffsetManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetManager.class);
     public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
-    public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100;
-    public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
+    public static final int CLEAN_INSTANCE_ONCE_LIMIT = 1000;
+    public static final long TWO_HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
     private static volatile OffsetManager offsetManager = null;
     private final OffsetStore offsetStore;
     private final InstanceStore instanceStore;
@@ -161,8 +163,7 @@ public class OffsetManager extends AbstractDaemon {
                     }
                 }
             }
-            long expireTime = DateTransUtils.calcOffset(
-                    DB_INSTANCE_EXPIRE_CYCLE_COUNT + 
instanceFromDb.getCycleUnit());
+            long expireTime = 
Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) + 
TWO_HOUR_TIMEOUT_INTERVAL;
             if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > 
expireTime) {
                 cleanCount.getAndIncrement();
                 LOGGER.info("instance has expired, delete from instance store 
dataTime {} taskId {} instanceId {}",
@@ -189,4 +190,26 @@ public class OffsetManager extends AbstractDaemon {
     public void stop() throws Exception {
 
     }
+
+    public static long getScanCycleRange(String cycleUnit) {
+        if 
(AgentConfiguration.getAgentConf().hasKey(AgentConstants.AGENT_SCAN_RANGE)) {
+            String range = 
AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_SCAN_RANGE);
+            return DateTransUtils.calcOffset(range + cycleUnit);
+        }
+        switch (cycleUnit) {
+            case AgentUtils.DAY: {
+                return 
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_DAY + 
cycleUnit);
+            }
+            case AgentUtils.HOUR:
+            case AgentUtils.HOUR_LOW_CASE: {
+                return 
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_HOUR + 
cycleUnit);
+            }
+            case AgentUtils.MINUTE: {
+                return 
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_MINUTE + 
cycleUnit);
+            }
+            default: {
+                return 
DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE + cycleUnit);
+            }
+        }
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 069932711c..92bfaa427f 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -29,7 +29,6 @@ import org.apache.inlong.agent.message.file.ProxyMessage;
 import org.apache.inlong.agent.message.file.SenderMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
-import org.apache.inlong.agent.plugin.sinks.filecollect.SenderManager;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 
@@ -66,7 +65,7 @@ public class ProxySink extends AbstractSink {
             new SynchronousQueue<>(),
             new AgentThreadFactory("proxy-sink"));
     private MessageFilter messageFilter;
-    private SenderManager senderManager;
+    private Sender sender;
     private byte[] fieldSplitter;
     private volatile boolean shutdown = false;
     private volatile boolean running = false;
@@ -159,7 +158,7 @@ public class ProxySink extends AbstractSink {
             if (senderMessage == null) {
                 continue;
             }
-            senderManager.sendBatch(senderMessage);
+            sender.sendBatch(senderMessage);
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
                 lastPrintTime = AgentUtils.getCurrentTime();
                 LOGGER.info("send groupId {}, streamId {}, message size {}, 
taskId {}, "
@@ -178,9 +177,9 @@ public class ProxySink extends AbstractSink {
                 StandardCharsets.UTF_8);
         sourceName = profile.getInstanceId();
         offsetManager = OffsetManager.getInstance();
-        senderManager = new SenderManager(profile, inlongGroupId, sourceName);
+        sender = new Sender(profile, inlongGroupId, sourceName);
         try {
-            senderManager.Start();
+            sender.Start();
             EXECUTOR_SERVICE.execute(coreThread());
             EXECUTOR_SERVICE.execute(flushOffset());
             inited = true;
@@ -200,7 +199,7 @@ public class ProxySink extends AbstractSink {
         }
         Long start = AgentUtils.getCurrentTime();
         shutdown = true;
-        senderManager.Stop();
+        sender.Stop();
         LOGGER.info("destroy proxySink, wait for sender close {} ms instance 
{}", AgentUtils.getCurrentTime() - start,
                 profile.getInstanceId());
         start = AgentUtils.getCurrentTime();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
similarity index 98%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
index 76593b7512..3195ee45c2 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.sinks.filecollect;
+package org.apache.inlong.agent.plugin.sinks;
 
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -68,9 +68,9 @@ import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
 /**
  * proxy client
  */
-public class SenderManager {
+public class Sender {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
     private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
     public static final int RESEND_QUEUE_WAIT_MS = 10;
     // cache for group and sender list, share the map cross agent lifecycle.
@@ -112,7 +112,7 @@ public class SenderManager {
     private static final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
     private long auditVersion;
 
-    public SenderManager(InstanceProfile profile, String inlongGroupId, String 
sourcePath) {
+    public Sender(InstanceProfile profile, String inlongGroupId, String 
sourcePath) {
         this.profile = profile;
         auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION));
         managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
index 4c7b729a4d..49e45ba751 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.plugin.task.logcollection;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskAction;
 import org.apache.inlong.agent.plugin.task.AbstractTask;
 import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
@@ -44,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 public abstract class LogAbstractTask extends AbstractTask {
 
     private static final int INSTANCE_QUEUE_CAPACITY = 10;
-    public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+    public static final long ONE_HOUR_TIMEOUT_INTERVAL = 3600 * 1000;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogAbstractTask.class);
     protected boolean retry;
     protected BlockingQueue<InstanceProfile> instanceQueue;
@@ -207,10 +208,13 @@ public abstract class LogAbstractTask extends 
AbstractTask {
         for (Map.Entry<String, Map<String, InstanceProfile>> entry : 
eventMap.entrySet()) {
             /* If the data time of the event is within 2 days before (after) 
the current time, it is valid */
             String dataTime = entry.getKey();
-            if (!DateUtils.isValidCreationTime(dataTime, 
DAY_TIMEOUT_INTERVAL)) {
+            if (!DateUtils.isValidCreationTime(dataTime,
+                    
Math.abs(OffsetManager.getScanCycleRange(taskProfile.getCycleUnit()))
+                            + ONE_HOUR_TIMEOUT_INTERVAL)) {
                 /* Remove it from memory map. */
                 eventMap.remove(dataTime);
-                LOGGER.warn("remove too old event from event map. dataTime 
{}", dataTime);
+                LOGGER.warn("remove too old event from event map taskId {} 
dataTime {}", taskProfile.getTaskId(),
+                        dataTime);
             }
         }
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
index ecddccce9f..4ededea940 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.utils.regex;
 
+import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.utils.DateTransUtils;
 
 import org.slf4j.Logger;
@@ -29,7 +30,6 @@ import java.util.List;
 public class Scanner {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Scanner.class);
-    public static final String SCAN_CYCLE_RANCE = "-2";
 
     public static class TimeRange {
 
@@ -87,9 +87,8 @@ public class Scanner {
             boolean isRetry) {
         if (!isRetry) {
             long currentTime = System.currentTimeMillis();
-            // only scan two cycle, like two hours or two days
-            long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + 
cycleUnit);
-            startTime = currentTime + offset + 
DateTransUtils.calcOffset(timeOffset);
+            startTime =
+                    currentTime + OffsetManager.getScanCycleRange(cycleUnit) + 
DateTransUtils.calcOffset(timeOffset);
             endTime = currentTime + DateTransUtils.calcOffset(timeOffset);
         }
         return new TimeRange(startTime, endTime);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
index 56f46d74a0..deee04f1c9 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager;
+import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
@@ -40,12 +40,12 @@ public class KafkaSinkTest {
     private static MockSink kafkaSink;
     private static InstanceProfile profile;
     private static AgentBaseTestsHelper helper;
-    private static final ClassLoader LOADER = 
TestSenderManager.class.getClassLoader();
+    private static final ClassLoader LOADER = 
TestSender.class.getClassLoader();
 
     @BeforeClass
     public static void setUp() throws Exception {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
-        helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+        helper = new 
AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile =
                 helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
index c8cf365850..eccbfb35fd 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
-import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager;
+import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
@@ -40,12 +40,12 @@ public class PulsarSinkTest {
     private static MockSink pulsarSink;
     private static InstanceProfile profile;
     private static AgentBaseTestsHelper helper;
-    private static final ClassLoader LOADER = 
TestSenderManager.class.getClassLoader();
+    private static final ClassLoader LOADER = 
TestSender.class.getClassLoader();
 
     @BeforeClass
     public static void setUp() throws Exception {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
-        helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+        helper = new 
AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile =
                 helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java
similarity index 89%
rename from 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
rename to 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java
index 4e068f5930..5958719c15 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.message.file.OffsetAckInfo;
 import org.apache.inlong.agent.message.file.SenderMessage;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.sinks.Sender;
 import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
@@ -52,12 +53,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(SenderManager.class)
+@PrepareForTest(Sender.class)
 @PowerMockIgnore({"javax.management.*"})
-public class TestSenderManager {
+public class TestSender {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestSenderManager.class);
-    private static final ClassLoader LOADER = 
TestSenderManager.class.getClassLoader();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestSender.class);
+    private static final ClassLoader LOADER = 
TestSender.class.getClassLoader();
     private static AgentBaseTestsHelper helper;
     private static InstanceProfile profile;
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
@@ -69,7 +70,7 @@ public class TestSenderManager {
     @BeforeClass
     public static void setup() {
         String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
-        helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+        helper = new 
AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile =
                 helper.getFileTaskProfile(1, pattern, "csv", false, "", "", 
TaskStateEnum.RUNNING, "D",
@@ -88,17 +89,17 @@ public class TestSenderManager {
         List<MsgSendCallback> cbList = new ArrayList<>();
         try {
             profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
-            SenderManager senderManager = PowerMockito.spy(new 
SenderManager(profile, "inlongGroupId", "sourceName"));
-            PowerMockito.doNothing().when(senderManager, 
"createMessageSender");
+            Sender sender = PowerMockito.spy(new Sender(profile, 
"inlongGroupId", "sourceName"));
+            PowerMockito.doNothing().when(sender, "createMessageSender");
 
             PowerMockito.doAnswer(invocation -> {
                 MsgSendCallback cb = invocation.getArgument(0);
                 cbList.add(cb);
                 return null;
-            }).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
+            }).when(sender, "asyncSendByMessageSender", Mockito.any(),
                     Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.anyLong(), Mockito.any(),
                     Mockito.any(), Mockito.anyBoolean());
-            senderManager.Start();
+            sender.Start();
             Long offset = 0L;
             List<OffsetAckInfo> ackInfoListTotal = new ArrayList<>();
             for (int i = 0; i < 10; i++) {
@@ -112,7 +113,7 @@ public class TestSenderManager {
                 }
                 SenderMessage senderMessage = new SenderMessage("taskId", 
"instanceId", "groupId", "streamId", bodyList,
                         AgentUtils.getCurrentTime(), null, ackInfoList);
-                senderManager.sendBatch(senderMessage);
+                sender.sendBatch(senderMessage);
             }
             Assert.assertTrue(cbList.size() == 10);
             for (int i = 0; i < 5; i++) {
diff --git a/pom.xml b/pom.xml
index 4194b90095..01a88cf4d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -207,6 +207,7 @@
         <libfb303.version>0.9.3</libfb303.version>
         <apache.thrift.version>0.14.1</apache.thrift.version>
         <aircompressor.version>0.27</aircompressor.version>
+        <json.smart.version>2.5.1</json.smart.version>
     </properties>
 
     <dependencyManagement>
@@ -1269,6 +1270,11 @@
                 <artifactId>tencentcloud-sdk-java-cls</artifactId>
                 <version>${tencentcloud-api.version}</version>
             </dependency>
+            <dependency>
+                <groupId>net.minidev</groupId>
+                <artifactId>json-smart</artifactId>
+                <version>${json.smart.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

Reply via email to