This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 24bdfc1f9e [INLONG-9969][Agent] Release the memory semaphore of the source only when the data is placed in the queue (#9971) 24bdfc1f9e is described below commit 24bdfc1f9eec5f8ea6cf5551c3085c0886ee3d5c Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Apr 11 11:52:30 2024 +0800 [INLONG-9969][Agent] Release the memory semaphore of the source only when the data is placed in the queue (#9971) --- .../java/org/apache/inlong/agent/plugin/sources/KafkaSource.java | 7 ++++--- .../java/org/apache/inlong/agent/plugin/sources/LogFileSource.java | 3 ++- .../java/org/apache/inlong/agent/plugin/sources/PulsarSource.java | 7 ++++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java index fa9034c6bc..62aa87433b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java @@ -94,7 +94,7 @@ public class KafkaSource extends AbstractSource { 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("kafka-source")); - private BlockingQueue<KafkaSource.SourceData> queue; + private BlockingQueue<SourceData> queue; public InstanceProfile profile; private int maxPackSize; private String taskId; @@ -213,13 +213,13 @@ public class KafkaSource extends AbstractSource { break; } ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(Duration.ofMillis(1000)); - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); if (records.isEmpty()) { if (queue.isEmpty()) { emptyCount.incrementAndGet(); } else { emptyCount.set(0); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); AgentUtils.silenceSleepInSeconds(1); continue; } @@ -234,6 +234,7 @@ public class KafkaSource extends AbstractSource { putIntoQueue(sourceData); offset = record.offset(); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); kafkaConsumer.commitSync(); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { @@ -298,7 +299,7 @@ public class KafkaSource extends AbstractSource { @Override public Message read() { - KafkaSource.SourceData sourceData = null; + SourceData sourceData = null; try { sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 3ba0f28f9c..9f83f6fc06 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -476,13 +476,13 @@ public class LogFileSource extends AbstractSource { } catch (IOException e) { LOGGER.error("readFromPos error: ", e); } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); if (lines.isEmpty()) { if (queue.isEmpty()) { emptyCount++; } else { emptyCount = 0; } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); AgentUtils.silenceSleepInSeconds(1); continue; } @@ -494,6 +494,7 @@ public class LogFileSource extends AbstractSource { } putIntoQueue(lines.get(i)); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { lastPrintTime = AgentUtils.getCurrentTime(); LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index ebd8495d9f..c64653e2a3 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -185,13 +185,13 @@ public class PulsarSource extends AbstractSource { break; } org.apache.pulsar.client.api.Message<byte[]> message = consumer.receive(0, TimeUnit.MILLISECONDS); - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); if (ObjectUtils.isEmpty(message)) { if (queue.isEmpty()) { emptyCount.incrementAndGet(); } else { emptyCount.set(0); } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); AgentUtils.silenceSleepInSeconds(1); continue; } @@ -203,6 +203,7 @@ public class PulsarSource extends AbstractSource { break; } putIntoQueue(sourceData); + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_TOTAL_LEN); consumer.acknowledge(message); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { @@ -260,7 +261,7 @@ public class PulsarSource extends AbstractSource { @Override public Message read() { - PulsarSource.SourceData sourceData = null; + SourceData sourceData = null; try { sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -274,7 +275,7 @@ public class PulsarSource extends AbstractSource { return finalMsg; } - private Message createMessage(PulsarSource.SourceData sourceData) { + private Message createMessage(SourceData sourceData) { String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); Map<String, String> header = new HashMap<>(); header.put(PROXY_KEY_DATA, proxyPartitionKey);