This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 58ecec5a9d [INLONG-9190][Agent] Log file source clear buffer queue 
does not take effect (#9191)
58ecec5a9d is described below

commit 58ecec5a9d57e6f00759825daccd2d7199abe5c7
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Nov 1 15:21:33 2023 +0800

    [INLONG-9190][Agent] Log file source clear buffer queue does not take 
effect (#9191)
---
 .../inlong/agent/plugin/sources/LogFileSource.java |  2 +-
 .../agent/plugin/sources/TestLogFileSource.java    | 36 ++++++++++++++++++----
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index cc9f3724e8..ea0e63c95f 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -511,7 +511,7 @@ public class LogFileSource extends AbstractSource {
     }
 
     private void clearQueue(BlockingQueue<SourceData> queue) {
-        if (queue != null) {
+        if (queue == null) {
             return;
         }
         while (queue != null && !queue.isEmpty()) {
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 6c3257adc9..23b5027812 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -46,9 +46,11 @@ public class TestLogFileSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TestLogFileSource.class);
     private static final ClassLoader LOADER = 
TestLogFileSource.class.getClassLoader();
-    private static LogFileSource source;
     private static AgentBaseTestsHelper helper;
     private static final Gson GSON = new Gson();
+    private static final String[] check = {"hello line-end-symbol aa", "world 
line-end-symbol",
+            "agent line-end-symbol"};
+    private static InstanceProfile instanceProfile;
 
     @BeforeClass
     public static void setup() {
@@ -57,11 +59,15 @@ public class TestLogFileSource {
         helper = new 
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
-        InstanceProfile instanceProfile = taskProfile.createInstanceProfile("",
+        instanceProfile = taskProfile.createInstanceProfile("",
                 fileName, "20230928");
+
+    }
+
+    private LogFileSource getSource() {
         try {
             instanceProfile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(instanceProfile.getInstanceId()));
-            source = new LogFileSource();
+            LogFileSource source = new LogFileSource();
             Whitebox.setInternalState(source, "BATCH_READ_LINE_COUNT", 1);
             Whitebox.setInternalState(source, "BATCH_READ_LINE_TOTAL_LEN", 10);
             Whitebox.setInternalState(source, "PRINT_INTERVAL_MS", 0);
@@ -69,25 +75,31 @@ public class TestLogFileSource {
             Whitebox.setInternalState(source, "FINISH_READ_MAX_COUNT", 1);
             Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
             source.init(instanceProfile);
+            return source;
         } catch (Exception e) {
             LOGGER.error("source init error {}", e);
             Assert.assertTrue("source init error", false);
         }
+        return null;
     }
 
     @AfterClass
     public static void teardown() throws Exception {
-        source.destroy();
         helper.teardownAgentHome();
     }
 
     @Test
-    public void testTaskManager() {
-        String[] check = {"hello line-end-symbol aa", "world line-end-symbol", 
"agent line-end-symbol"};
+    public void testLogFileSource() {
+        testFullRead();
+        testCleanQueue();
+    }
+
+    private void testFullRead() {
         int srcLen = 0;
         for (int i = 0; i < check.length; i++) {
             srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
         }
+        LogFileSource source = getSource();
         await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
         int cnt = 0;
         int leftBeforeRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
@@ -101,9 +113,21 @@ public class TestLogFileSource {
             msg = source.read();
             cnt++;
         }
+        source.destroy();
         Assert.assertTrue(cnt == 3);
         Assert.assertTrue(srcLen == readLen);
         int leftAfterRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
         Assert.assertTrue(leftAfterRead == 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
     }
+
+    private void testCleanQueue() {
+        LogFileSource source = getSource();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+        for (int i = 0; i < 2; i++) {
+            source.read();
+        }
+        source.destroy();
+        int leftAfterRead = 
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+        Assert.assertTrue(leftAfterRead == 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
+    }
 }
\ No newline at end of file

Reply via email to