This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 82cc76e41a [INLONG-11752][Agent] Modify the default collection range of data (#11753) 82cc76e41a is described below commit 82cc76e41af429b316241d5dafb0e1474bd054cc Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Feb 13 19:53:48 2025 +0800 [INLONG-11752][Agent] Modify the default collection range of data (#11753) --- .../inlong/agent/constant/AgentConstants.java | 6 +++++ .../agent/core/instance/InstanceManager.java | 14 +++++----- .../inlong/agent/core/task/OffsetManager.java | 31 +++++++++++++++++++--- .../inlong/agent/plugin/sinks/ProxySink.java | 11 ++++---- .../SenderManager.java => Sender.java} | 8 +++--- .../plugin/task/logcollection/LogAbstractTask.java | 10 ++++--- .../inlong/agent/plugin/utils/regex/Scanner.java | 7 +++-- .../inlong/agent/plugin/sinks/KafkaSinkTest.java | 6 ++--- .../inlong/agent/plugin/sinks/PulsarSinkTest.java | 6 ++--- .../{TestSenderManager.java => TestSender.java} | 21 ++++++++------- pom.xml | 6 +++++ 11 files changed, 83 insertions(+), 43 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index f7c0f0d7c8..0c73c12852 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -76,6 +76,12 @@ public class AgentConstants { public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit"; public static final boolean DEFAULT_ENABLE_OOM_EXIT = false; + public static final String AGENT_SCAN_RANGE = "agent.scan.range"; + public static final String DEFAULT_AGENT_SCAN_RANGE = "-2"; + public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2"; + public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10"; + public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600"; + // pulsar sink config public static final String PULSAR_CLIENT_IO_TREHAD_NUM = "agent.sink.pulsar.client.io.thread.num"; public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1, diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 3f0a914e90..23c16cabe8 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -56,7 +56,9 @@ public class InstanceManager extends AbstractDaemon { public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; public static final int INSTANCE_PRINT_INTERVAL_MS = 10000; public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000; + public static final long KEEP_PACE_INTERVAL_MS = 60 * 1000; private long lastPrintTime = 0; + private long lastTraverseTime = 0; // instance in instance store private final InstanceStore instanceStore; private TaskStore taskStore; @@ -67,7 +69,7 @@ public class InstanceManager extends AbstractDaemon { private final BlockingQueue<InstanceAction> actionQueue; private final BlockingQueue<InstanceAction> addActionQueue; // task thread pool; - private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + private final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), @@ -77,10 +79,8 @@ public class InstanceManager extends AbstractDaemon { private final AgentConfiguration agentConf; private final String taskId; private long auditVersion; - private volatile boolean runAtLeastOneTime = false; private volatile boolean running = false; private final double reserveCoefficient = 0.8; - private long finishedInstanceCount = 0; private class InstancePrintStat { @@ -165,12 +165,16 @@ public class InstanceManager extends AbstractDaemon { Thread.currentThread().setName("instance-manager-core-" + taskId); running = true; while (isRunnable()) { + long currentTime = AgentUtils.getCurrentTime(); try { AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); printInstanceState(); dealWithActionQueue(); dealWithAddActionQueue(); - keepPaceWithStore(); + if (currentTime - lastTraverseTime > KEEP_PACE_INTERVAL_MS) { + keepPaceWithStore(); + lastTraverseTime = currentTime; + } String inlongGroupId = taskFromStore.getInlongGroupId(); String inlongStreamId = taskFromStore.getInlongStreamId(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId, @@ -179,7 +183,6 @@ public class InstanceManager extends AbstractDaemon { LOGGER.error("coreThread error: ", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); } - runAtLeastOneTime = true; } running = false; }; @@ -356,7 +359,6 @@ public class InstanceManager extends AbstractDaemon { deleteFromMemory(profile.getInstanceId()); LOGGER.info("finished instance state {} taskId {} instanceId {}", profile.getState(), profile.getTaskId(), profile.getInstanceId()); - finishedInstanceCount++; } private void deleteInstance(String instanceId) { diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java index 3167b850e1..b82e399c81 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java @@ -18,9 +18,11 @@ package org.apache.inlong.agent.core.task; import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.OffsetProfile; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.store.InstanceStore; @@ -50,8 +52,8 @@ public class OffsetManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000; - public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100; - public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3"; + public static final int CLEAN_INSTANCE_ONCE_LIMIT = 1000; + public static final long TWO_HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000; private static volatile OffsetManager offsetManager = null; private final OffsetStore offsetStore; private final InstanceStore instanceStore; @@ -161,8 +163,7 @@ public class OffsetManager extends AbstractDaemon { } } } - long expireTime = DateTransUtils.calcOffset( - DB_INSTANCE_EXPIRE_CYCLE_COUNT + instanceFromDb.getCycleUnit()); + long expireTime = Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) + TWO_HOUR_TIMEOUT_INTERVAL; if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > expireTime) { cleanCount.getAndIncrement(); LOGGER.info("instance has expired, delete from instance store dataTime {} taskId {} instanceId {}", @@ -189,4 +190,26 @@ public class OffsetManager extends AbstractDaemon { public void stop() throws Exception { } + + public static long getScanCycleRange(String cycleUnit) { + if (AgentConfiguration.getAgentConf().hasKey(AgentConstants.AGENT_SCAN_RANGE)) { + String range = AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_SCAN_RANGE); + return DateTransUtils.calcOffset(range + cycleUnit); + } + switch (cycleUnit) { + case AgentUtils.DAY: { + return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_DAY + cycleUnit); + } + case AgentUtils.HOUR: + case AgentUtils.HOUR_LOW_CASE: { + return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_HOUR + cycleUnit); + } + case AgentUtils.MINUTE: { + return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_MINUTE + cycleUnit); + } + default: { + return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE + cycleUnit); + } + } + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index 069932711c..92bfaa427f 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -29,7 +29,6 @@ import org.apache.inlong.agent.message.file.ProxyMessage; import org.apache.inlong.agent.message.file.SenderMessage; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.MessageFilter; -import org.apache.inlong.agent.plugin.sinks.filecollect.SenderManager; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ThreadUtils; @@ -66,7 +65,7 @@ public class ProxySink extends AbstractSink { new SynchronousQueue<>(), new AgentThreadFactory("proxy-sink")); private MessageFilter messageFilter; - private SenderManager senderManager; + private Sender sender; private byte[] fieldSplitter; private volatile boolean shutdown = false; private volatile boolean running = false; @@ -159,7 +158,7 @@ public class ProxySink extends AbstractSink { if (senderMessage == null) { continue; } - senderManager.sendBatch(senderMessage); + sender.sendBatch(senderMessage); if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { lastPrintTime = AgentUtils.getCurrentTime(); LOGGER.info("send groupId {}, streamId {}, message size {}, taskId {}, " @@ -178,9 +177,9 @@ public class ProxySink extends AbstractSink { StandardCharsets.UTF_8); sourceName = profile.getInstanceId(); offsetManager = OffsetManager.getInstance(); - senderManager = new SenderManager(profile, inlongGroupId, sourceName); + sender = new Sender(profile, inlongGroupId, sourceName); try { - senderManager.Start(); + sender.Start(); EXECUTOR_SERVICE.execute(coreThread()); EXECUTOR_SERVICE.execute(flushOffset()); inited = true; @@ -200,7 +199,7 @@ public class ProxySink extends AbstractSink { } Long start = AgentUtils.getCurrentTime(); shutdown = true; - senderManager.Stop(); + sender.Stop(); LOGGER.info("destroy proxySink, wait for sender close {} ms instance {}", AgentUtils.getCurrentTime() - start, profile.getInstanceId()); start = AgentUtils.getCurrentTime(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java similarity index 98% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java index 76593b7512..3195ee45c2 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/Sender.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.sinks.filecollect; +package org.apache.inlong.agent.plugin.sinks; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; @@ -68,9 +68,9 @@ import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; /** * proxy client */ -public class SenderManager { +public class Sender { - private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance(); public static final int RESEND_QUEUE_WAIT_MS = 10; // cache for group and sender list, share the map cross agent lifecycle. @@ -112,7 +112,7 @@ public class SenderManager { private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); private long auditVersion; - public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) { + public Sender(InstanceProfile profile, String inlongGroupId, String sourcePath) { this.profile = profile; auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION)); managerAddr = agentConf.get(AGENT_MANAGER_ADDR); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java index 4c7b729a4d..49e45ba751 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.plugin.task.logcollection; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.TaskAction; import org.apache.inlong.agent.plugin.task.AbstractTask; import org.apache.inlong.agent.plugin.utils.regex.DateUtils; @@ -44,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue; public abstract class LogAbstractTask extends AbstractTask { private static final int INSTANCE_QUEUE_CAPACITY = 10; - public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000; + public static final long ONE_HOUR_TIMEOUT_INTERVAL = 3600 * 1000; private static final Logger LOGGER = LoggerFactory.getLogger(LogAbstractTask.class); protected boolean retry; protected BlockingQueue<InstanceProfile> instanceQueue; @@ -207,10 +208,13 @@ public abstract class LogAbstractTask extends AbstractTask { for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) { /* If the data time of the event is within 2 days before (after) the current time, it is valid */ String dataTime = entry.getKey(); - if (!DateUtils.isValidCreationTime(dataTime, DAY_TIMEOUT_INTERVAL)) { + if (!DateUtils.isValidCreationTime(dataTime, + Math.abs(OffsetManager.getScanCycleRange(taskProfile.getCycleUnit())) + + ONE_HOUR_TIMEOUT_INTERVAL)) { /* Remove it from memory map. */ eventMap.remove(dataTime); - LOGGER.warn("remove too old event from event map. dataTime {}", dataTime); + LOGGER.warn("remove too old event from event map taskId {} dataTime {}", taskProfile.getTaskId(), + dataTime); } } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java index ecddccce9f..4ededea940 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java @@ -17,6 +17,7 @@ package org.apache.inlong.agent.plugin.utils.regex; +import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.utils.DateTransUtils; import org.slf4j.Logger; @@ -29,7 +30,6 @@ import java.util.List; public class Scanner { private static final Logger LOGGER = LoggerFactory.getLogger(Scanner.class); - public static final String SCAN_CYCLE_RANCE = "-2"; public static class TimeRange { @@ -87,9 +87,8 @@ public class Scanner { boolean isRetry) { if (!isRetry) { long currentTime = System.currentTimeMillis(); - // only scan two cycle, like two hours or two days - long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + cycleUnit); - startTime = currentTime + offset + DateTransUtils.calcOffset(timeOffset); + startTime = + currentTime + OffsetManager.getScanCycleRange(cycleUnit) + DateTransUtils.calcOffset(timeOffset); endTime = currentTime + DateTransUtils.calcOffset(timeOffset); } return new TimeRange(startTime, endTime); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java index 56f46d74a0..deee04f1c9 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java @@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; -import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager; +import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.enums.TaskStateEnum; @@ -40,12 +40,12 @@ public class KafkaSinkTest { private static MockSink kafkaSink; private static InstanceProfile profile; private static AgentBaseTestsHelper helper; - private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader(); + private static final ClassLoader LOADER = TestSender.class.getClassLoader(); @BeforeClass public static void setUp() throws Exception { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); - helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); + helper = new AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java index c8cf365850..eccbfb35fd 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java @@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; -import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager; +import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.enums.TaskStateEnum; @@ -40,12 +40,12 @@ public class PulsarSinkTest { private static MockSink pulsarSink; private static InstanceProfile profile; private static AgentBaseTestsHelper helper; - private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader(); + private static final ClassLoader LOADER = TestSender.class.getClassLoader(); @BeforeClass public static void setUp() throws Exception { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); - helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); + helper = new AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java similarity index 89% rename from inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java rename to inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java index 4e068f5930..5958719c15 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSender.java @@ -24,6 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.message.file.OffsetAckInfo; import org.apache.inlong.agent.message.file.SenderMessage; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.sinks.Sender; import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.enums.TaskStateEnum; @@ -52,12 +53,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @RunWith(PowerMockRunner.class) -@PrepareForTest(SenderManager.class) +@PrepareForTest(Sender.class) @PowerMockIgnore({"javax.management.*"}) -public class TestSenderManager { +public class TestSender { - private static final Logger LOGGER = LoggerFactory.getLogger(TestSenderManager.class); - private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader(); + private static final Logger LOGGER = LoggerFactory.getLogger(TestSender.class); + private static final ClassLoader LOADER = TestSender.class.getClassLoader(); private static AgentBaseTestsHelper helper; private static InstanceProfile profile; private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( @@ -69,7 +70,7 @@ public class TestSenderManager { @BeforeClass public static void setup() { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); - helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); + helper = new AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", @@ -88,17 +89,17 @@ public class TestSenderManager { List<MsgSendCallback> cbList = new ArrayList<>(); try { profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId())); - SenderManager senderManager = PowerMockito.spy(new SenderManager(profile, "inlongGroupId", "sourceName")); - PowerMockito.doNothing().when(senderManager, "createMessageSender"); + Sender sender = PowerMockito.spy(new Sender(profile, "inlongGroupId", "sourceName")); + PowerMockito.doNothing().when(sender, "createMessageSender"); PowerMockito.doAnswer(invocation -> { MsgSendCallback cb = invocation.getArgument(0); cbList.add(cb); return null; - }).when(senderManager, "asyncSendByMessageSender", Mockito.any(), + }).when(sender, "asyncSendByMessageSender", Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); - senderManager.Start(); + sender.Start(); Long offset = 0L; List<OffsetAckInfo> ackInfoListTotal = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -112,7 +113,7 @@ public class TestSenderManager { } SenderMessage senderMessage = new SenderMessage("taskId", "instanceId", "groupId", "streamId", bodyList, AgentUtils.getCurrentTime(), null, ackInfoList); - senderManager.sendBatch(senderMessage); + sender.sendBatch(senderMessage); } Assert.assertTrue(cbList.size() == 10); for (int i = 0; i < 5; i++) { diff --git a/pom.xml b/pom.xml index 4194b90095..01a88cf4d7 100644 --- a/pom.xml +++ b/pom.xml @@ -207,6 +207,7 @@ <libfb303.version>0.9.3</libfb303.version> <apache.thrift.version>0.14.1</apache.thrift.version> <aircompressor.version>0.27</aircompressor.version> + <json.smart.version>2.5.1</json.smart.version> </properties> <dependencyManagement> @@ -1269,6 +1270,11 @@ <artifactId>tencentcloud-sdk-java-cls</artifactId> <version>${tencentcloud-api.version}</version> </dependency> + <dependency> + <groupId>net.minidev</groupId> + <artifactId>json-smart</artifactId> + <version>${json.smart.version}</version> + </dependency> </dependencies> </dependencyManagement>