This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ec4c5789c7 [INLONG-11562][Agent] Modify the MemoryManager class to support adding semaphore (#11563) ec4c5789c7 is described below commit ec4c5789c746c3c3b86c55c2389a43c7a9bb66fe Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Sun Dec 1 16:09:17 2024 +0800 [INLONG-11562][Agent] Modify the MemoryManager class to support adding semaphore (#11563) --- .../inlong/agent/core/task/MemoryManager.java | 25 ++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java index d785effdac..b262e171e9 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java @@ -23,6 +23,8 @@ import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; @@ -44,6 +46,7 @@ public class MemoryManager { private ConcurrentHashMap<String, Semaphore> semaphoreMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, Long> lastPrintTime = new ConcurrentHashMap<>(); private static final int PRINT_INTERVAL_MS = 1000; + private Set<String> defaultSemaphoreTypes = new HashSet<>(); private MemoryManager() { this.conf = AgentConfiguration.getAgentConf(); @@ -52,16 +55,19 @@ public class MemoryManager { conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT)); semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore); lastPrintTime.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, 0L); + defaultSemaphoreTypes.add(AGENT_GLOBAL_READER_SOURCE_PERMIT); semaphore = new Semaphore( conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT)); semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore); lastPrintTime.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, 0L); + defaultSemaphoreTypes.add(AGENT_GLOBAL_READER_QUEUE_PERMIT); semaphore = new Semaphore( conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT)); semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore); lastPrintTime.put(AGENT_GLOBAL_WRITER_PERMIT, 0L); + defaultSemaphoreTypes.add(AGENT_GLOBAL_WRITER_PERMIT); } /** @@ -78,6 +84,20 @@ public class MemoryManager { return memoryManager; } + public void addSemaphore(String semaphoreType, int permit) { + if (semaphoreMap.containsKey(semaphoreType)) { + return; + } + synchronized (MemoryManager.class) { + if (semaphoreMap.containsKey(semaphoreType)) { + return; + } + Semaphore semaphore = new Semaphore(permit); + semaphoreMap.put(semaphoreType, semaphore); + lastPrintTime.put(semaphoreType, 0L); + } + } + public boolean tryAcquire(String semaphoreName, int permit) { Semaphore semaphore = semaphoreMap.get(semaphoreName); if (semaphore == null) { @@ -123,5 +143,10 @@ public class MemoryManager { printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT, "printAll"); printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT, "printAll"); printDetail(AGENT_GLOBAL_WRITER_PERMIT, "printAll"); + semaphoreMap.entrySet().forEach(entry -> { + if (!defaultSemaphoreTypes.contains(entry.getKey())) { + printDetail(entry.getKey(), "printAll"); + } + }); } }