This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ec4c5789c7 [INLONG-11562][Agent] Modify the MemoryManager class to 
support adding semaphore (#11563)
ec4c5789c7 is described below

commit ec4c5789c746c3c3b86c55c2389a43c7a9bb66fe
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Sun Dec 1 16:09:17 2024 +0800

    [INLONG-11562][Agent] Modify the MemoryManager class to support adding 
semaphore (#11563)
---
 .../inlong/agent/core/task/MemoryManager.java      | 25 ++++++++++++++++++++++
 1 file changed, 25 insertions(+)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
index d785effdac..b262e171e9 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
@@ -23,6 +23,8 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 
@@ -44,6 +46,7 @@ public class MemoryManager {
     private ConcurrentHashMap<String, Semaphore> semaphoreMap = new 
ConcurrentHashMap<>();
     private ConcurrentHashMap<String, Long> lastPrintTime = new 
ConcurrentHashMap<>();
     private static final int PRINT_INTERVAL_MS = 1000;
+    private Set<String> defaultSemaphoreTypes = new HashSet<>();
 
     private MemoryManager() {
         this.conf = AgentConfiguration.getAgentConf();
@@ -52,16 +55,19 @@ public class MemoryManager {
                 conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
         semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
         lastPrintTime.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, 0L);
+        defaultSemaphoreTypes.add(AGENT_GLOBAL_READER_SOURCE_PERMIT);
 
         semaphore = new Semaphore(
                 conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
         semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
         lastPrintTime.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, 0L);
+        defaultSemaphoreTypes.add(AGENT_GLOBAL_READER_QUEUE_PERMIT);
 
         semaphore = new Semaphore(
                 conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, 
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
         semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
         lastPrintTime.put(AGENT_GLOBAL_WRITER_PERMIT, 0L);
+        defaultSemaphoreTypes.add(AGENT_GLOBAL_WRITER_PERMIT);
     }
 
     /**
@@ -78,6 +84,20 @@ public class MemoryManager {
         return memoryManager;
     }
 
+    public void addSemaphore(String semaphoreType, int permit) {
+        if (semaphoreMap.containsKey(semaphoreType)) {
+            return;
+        }
+        synchronized (MemoryManager.class) {
+            if (semaphoreMap.containsKey(semaphoreType)) {
+                return;
+            }
+            Semaphore semaphore = new Semaphore(permit);
+            semaphoreMap.put(semaphoreType, semaphore);
+            lastPrintTime.put(semaphoreType, 0L);
+        }
+    }
+
     public boolean tryAcquire(String semaphoreName, int permit) {
         Semaphore semaphore = semaphoreMap.get(semaphoreName);
         if (semaphore == null) {
@@ -123,5 +143,10 @@ public class MemoryManager {
         printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT, "printAll");
         printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT, "printAll");
         printDetail(AGENT_GLOBAL_WRITER_PERMIT, "printAll");
+        semaphoreMap.entrySet().forEach(entry -> {
+            if (!defaultSemaphoreTypes.contains(entry.getKey())) {
+                printDetail(entry.getKey(), "printAll");
+            }
+        });
     }
 }

Reply via email to