This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 58ecec5a9d [INLONG-9190][Agent] Log file source clear buffer queue does not take effect (#9191) 58ecec5a9d is described below commit 58ecec5a9d57e6f00759825daccd2d7199abe5c7 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Nov 1 15:21:33 2023 +0800 [INLONG-9190][Agent] Log file source clear buffer queue does not take effect (#9191) --- .../inlong/agent/plugin/sources/LogFileSource.java | 2 +- .../agent/plugin/sources/TestLogFileSource.java | 36 ++++++++++++++++++---- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index cc9f3724e8..ea0e63c95f 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -511,7 +511,7 @@ public class LogFileSource extends AbstractSource { } private void clearQueue(BlockingQueue<SourceData> queue) { - if (queue != null) { + if (queue == null) { return; } while (queue != null && !queue.isEmpty()) { diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index 6c3257adc9..23b5027812 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -46,9 +46,11 @@ public class TestLogFileSource { private static final Logger LOGGER = LoggerFactory.getLogger(TestLogFileSource.class); private static final ClassLoader LOADER = TestLogFileSource.class.getClassLoader(); - private static LogFileSource source; private static AgentBaseTestsHelper helper; private static final Gson GSON = new Gson(); + private static final String[] check = {"hello line-end-symbol aa", "world line-end-symbol", + "agent line-end-symbol"}; + private static InstanceProfile instanceProfile; @BeforeClass public static void setup() { @@ -57,11 +59,15 @@ public class TestLogFileSource { helper = new AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); - InstanceProfile instanceProfile = taskProfile.createInstanceProfile("", + instanceProfile = taskProfile.createInstanceProfile("", fileName, "20230928"); + + } + + private LogFileSource getSource() { try { instanceProfile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(instanceProfile.getInstanceId())); - source = new LogFileSource(); + LogFileSource source = new LogFileSource(); Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1); Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10); Whitebox.setInternalState(source, "PRINT_INTERVAL_MS", 0); @@ -69,25 +75,31 @@ public class TestLogFileSource { Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1); Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10); source.init(instanceProfile); + return source; } catch (Exception e) { LOGGER.error("source init error {}", e); Assert.assertTrue("source init error", false); } + return null; } @AfterClass public static void teardown() throws Exception { - source.destroy(); helper.teardownAgentHome(); } @Test - public void testTaskManager() { - String[] check = {"hello line-end-symbol aa", "world line-end-symbol", "agent line-end-symbol"}; + public void testLogFileSource() { + testFullRead(); + testCleanQueue(); + } + + private void testFullRead() { int srcLen = 0; for (int i = 0; i < check.length; i++) { srcLen += check[i].getBytes(StandardCharsets.UTF_8).length; } + LogFileSource source = getSource(); await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish()); int cnt = 0; int leftBeforeRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT); @@ -101,9 +113,21 @@ public class TestLogFileSource { msg = source.read(); cnt++; } + source.destroy(); Assert.assertTrue(cnt == 3); Assert.assertTrue(srcLen == readLen); int leftAfterRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT); Assert.assertTrue(leftAfterRead == DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT); } + + private void testCleanQueue() { + LogFileSource source = getSource(); + await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish()); + for (int i = 0; i < 2; i++) { + source.read(); + } + source.destroy(); + int leftAfterRead = MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT); + Assert.assertTrue(leftAfterRead == DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT); + } } \ No newline at end of file