This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 24bdfc1f9e [INLONG-9969][Agent] Release the memory semaphore of the 
source only when the data is placed in the queue (#9971)
24bdfc1f9e is described below

commit 24bdfc1f9eec5f8ea6cf5551c3085c0886ee3d5c
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Apr 11 11:52:30 2024 +0800

    [INLONG-9969][Agent] Release the memory semaphore of the source only when 
the data is placed in the queue (#9971)
---
 .../java/org/apache/inlong/agent/plugin/sources/KafkaSource.java   | 7 ++++---
 .../java/org/apache/inlong/agent/plugin/sources/LogFileSource.java | 3 ++-
 .../java/org/apache/inlong/agent/plugin/sources/PulsarSource.java  | 7 ++++---
 3 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index fa9034c6bc..62aa87433b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -94,7 +94,7 @@ public class KafkaSource extends AbstractSource {
             1L, TimeUnit.SECONDS,
             new SynchronousQueue<>(),
             new AgentThreadFactory("kafka-source"));
-    private BlockingQueue<KafkaSource.SourceData> queue;
+    private BlockingQueue<SourceData> queue;
     public InstanceProfile profile;
     private int maxPackSize;
     private String taskId;
@@ -213,13 +213,13 @@ public class KafkaSource extends AbstractSource {
                 break;
             }
             ConsumerRecords<String, byte[]> records = 
kafkaConsumer.poll(Duration.ofMillis(1000));
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
             if (records.isEmpty()) {
                 if (queue.isEmpty()) {
                     emptyCount.incrementAndGet();
                 } else {
                     emptyCount.set(0);
                 }
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
                 AgentUtils.silenceSleepInSeconds(1);
                 continue;
             }
@@ -234,6 +234,7 @@ public class KafkaSource extends AbstractSource {
                 putIntoQueue(sourceData);
                 offset = record.offset();
             }
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
             kafkaConsumer.commitSync();
 
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
@@ -298,7 +299,7 @@ public class KafkaSource extends AbstractSource {
 
     @Override
     public Message read() {
-        KafkaSource.SourceData sourceData = null;
+        SourceData sourceData = null;
         try {
             sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 3ba0f28f9c..9f83f6fc06 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -476,13 +476,13 @@ public class LogFileSource extends AbstractSource {
             } catch (IOException e) {
                 LOGGER.error("readFromPos error: ", e);
             }
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
             if (lines.isEmpty()) {
                 if (queue.isEmpty()) {
                     emptyCount++;
                 } else {
                     emptyCount = 0;
                 }
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
                 AgentUtils.silenceSleepInSeconds(1);
                 continue;
             }
@@ -494,6 +494,7 @@ public class LogFileSource extends AbstractSource {
                 }
                 putIntoQueue(lines.get(i));
             }
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
                 lastPrintTime = AgentUtils.getCurrentTime();
                 LOGGER.info("path is {}, linePosition {}, bytePosition is {} 
file len {}, reads lines size {}",
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index ebd8495d9f..c64653e2a3 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -185,13 +185,13 @@ public class PulsarSource extends AbstractSource {
                 break;
             }
             org.apache.pulsar.client.api.Message<byte[]> message = 
consumer.receive(0, TimeUnit.MILLISECONDS);
-            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
             if (ObjectUtils.isEmpty(message)) {
                 if (queue.isEmpty()) {
                     emptyCount.incrementAndGet();
                 } else {
                     emptyCount.set(0);
                 }
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
                 AgentUtils.silenceSleepInSeconds(1);
                 continue;
             }
@@ -203,6 +203,7 @@ public class PulsarSource extends AbstractSource {
                 break;
             }
             putIntoQueue(sourceData);
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
             consumer.acknowledge(message);
 
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
@@ -260,7 +261,7 @@ public class PulsarSource extends AbstractSource {
 
     @Override
     public Message read() {
-        PulsarSource.SourceData sourceData = null;
+        SourceData sourceData = null;
         try {
             sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
@@ -274,7 +275,7 @@ public class PulsarSource extends AbstractSource {
         return finalMsg;
     }
 
-    private Message createMessage(PulsarSource.SourceData sourceData) {
+    private Message createMessage(SourceData sourceData) {
         String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
         Map<String, String> header = new HashMap<>();
         header.put(PROXY_KEY_DATA, proxyPartitionKey);

Reply via email to