This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 381cbe5285 [INLONG-10728][Audit] Add global memory control for the 
Audit SDK (#10733)
381cbe5285 is described below

commit 381cbe5285deaa71ef4eaea5b5a2abed9eaa6bd6
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Tue Jul 30 09:44:28 2024 +0800

    [INLONG-10728][Audit] Add global memory control for the Audit SDK (#10733)
---
 .../org/apache/inlong/audit/AuditReporterImpl.java | 72 +++++++++++++++-------
 .../apache/inlong/audit/entity/AuditMetric.java    |  6 ++
 .../apache/inlong/audit/send/SenderManager.java    | 35 +++++++++--
 3 files changed, 85 insertions(+), 28 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index b4cc8028f0..e8a38da28f 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -310,33 +310,40 @@ public class AuditReporterImpl implements Serializable {
         }
         long startTime = System.currentTimeMillis();
         LOGGER.info("Audit flush isolate key {} ", isolateKey);
-        manager.checkFailedData();
-        resetStat();
 
-        summaryExpiredStatMap(isolateKey);
-
-        Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> 
iterator = this.preStatMap.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = 
iterator.next();
-            if (entry.getValue().isEmpty()) {
-                LOGGER.info("Remove the key of pre stat map: {},isolate key: 
{} ", entry.getKey(), isolateKey);
-                iterator.remove();
-                continue;
-            }
-            if (entry.getKey() > isolateKey) {
-                continue;
+        try {
+            manager.checkFailedData();
+            resetStat();
+
+            summaryExpiredStatMap(isolateKey);
+
+            Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> 
iterator =
+                    this.preStatMap.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = 
iterator.next();
+                if (entry.getValue().isEmpty()) {
+                    LOGGER.info("Remove the key of pre stat map: {},isolate 
key: {} ", entry.getKey(), isolateKey);
+                    iterator.remove();
+                    continue;
+                }
+                if (entry.getKey() > isolateKey) {
+                    continue;
+                }
+                summaryPreStatMap(entry.getKey(), entry.getValue());
+                send(entry.getKey());
             }
-            summaryPreStatMap(entry.getKey(), entry.getValue());
-            send(entry.getKey());
 
+            clearExpiredKey(isolateKey);
+        } catch (Exception exception) {
+            LOGGER.error("Flush audit has exception!", exception);
+        } finally {
+            manager.closeSocket();
         }
 
-        clearExpiredKey(isolateKey);
-
-        manager.closeSocket();
-
-        LOGGER.info("Success report {} package, Failed report {} package, 
total {} message, cost: {} ms",
+        LOGGER.info(
+                "Success report {} package, Failed report {} package, total {} 
message, memory size {}, cost: {} ms",
                 auditMetric.getSuccessPack(), auditMetric.getFailedPack(), 
auditMetric.getTotalMsg(),
+                auditMetric.getMemorySize(),
                 System.currentTimeMillis() - startTime);
 
         auditMetric.reset();
@@ -475,12 +482,25 @@ public class AuditReporterImpl implements Serializable {
         for (Map.Entry<String, StatInfo> entry : 
summaryStatMap.get(isolateKey).entrySet()) {
             // Entry key order: logTime inlongGroupID inlongStreamID auditID 
auditTag auditVersion
             String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
-            long logTime = Long.parseLong(keyArray[0]) * PERIOD;
+            if (keyArray.length < 6) {
+                LOGGER.error("Number of keys {} <6", keyArray.length);
+                continue;
+            }
+
+            long logTime;
+            long auditVersion;
+            try {
+                logTime = Long.parseLong(keyArray[0]) * PERIOD;
+                auditVersion = Long.parseLong(keyArray[5]);
+            } catch (NumberFormatException numberFormatException) {
+                LOGGER.error("Failed to parse long from string", 
numberFormatException);
+                continue;
+            }
+
             String inlongGroupID = keyArray[1];
             String inlongStreamID = keyArray[2];
             String auditID = keyArray[3];
             String auditTag = keyArray[4];
-            long auditVersion = Long.parseLong(keyArray[5]);
             StatInfo value = entry.getValue();
             AuditApi.AuditMessageBody msgBody = 
AuditApi.AuditMessageBody.newBuilder()
                     .setLogTs(logTime)
@@ -495,6 +515,8 @@ public class AuditReporterImpl implements Serializable {
                     .build();
             requestBuild.addMsgBody(msgBody);
 
+            auditMetric.addMemorySize(msgBody.toByteArray().length);
+
             if (dataId++ >= BATCH_NUM) {
                 dataId = 0;
                 packageId++;
@@ -615,4 +637,8 @@ public class AuditReporterImpl implements Serializable {
     public void setUpdateInterval(int updateInterval) {
         ProxyManager.getInstance().setUpdateInterval(updateInterval);
     }
+
+    public void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
+        SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
+    }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
index 000642bd64..05ac91a002 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
@@ -29,6 +29,7 @@ public class AuditMetric {
     private Long successPack = 0L;
     private Long failedPack = 0L;
     private Long totalMsg = 0L;
+    private Long memorySize = 0L;
 
     public void addSuccessPack(long successPack) {
         this.successPack += successPack;
@@ -42,9 +43,14 @@ public class AuditMetric {
         this.totalMsg += totalMsg;
     }
 
+    public void addMemorySize(long memorySize) {
+        this.memorySize += memorySize;
+    }
+
     public void reset() {
         successPack = 0L;
         failedPack = 0L;
         totalMsg = 0L;
+        memorySize = 0L;
     }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index 4da90f9f0d..f941cbae6d 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -37,6 +37,7 @@ import java.net.Socket;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Audit sender manager
@@ -50,6 +51,12 @@ public class SenderManager {
     private Socket socket = new Socket();
     private static final int PACKAGE_HEADER_LEN = 4;
     private static final int MAX_RESPONSE_LENGTH = 32 * 1024;
+    private static final AtomicLong globalAuditMemory = new AtomicLong(0);
+    private static long maxGlobalAuditMemory = 200 * 1024 * 1024;
+
+    public static void setMaxGlobalAuditMemory(long maxGlobalAuditMemory) {
+        SenderManager.maxGlobalAuditMemory = maxGlobalAuditMemory;
+    }
 
     public SenderManager(AuditConfig config) {
         auditConfig = config;
@@ -179,11 +186,22 @@ public class SenderManager {
         if (failedDataMap.isEmpty()) {
             checkAuditFile();
         }
-        if (failedDataMap.size() > auditConfig.getMaxCacheRow()) {
-            LOGGER.info("Failed cache size: {} > {}", failedDataMap.size(), 
auditConfig.getMaxCacheRow());
+
+        long failedDataSize = getFailedDataSize();
+        globalAuditMemory.addAndGet(failedDataSize);
+
+        if (failedDataMap.size() > auditConfig.getMaxCacheRow()
+                || globalAuditMemory.get() > maxGlobalAuditMemory) {
+            LOGGER.warn("Failed cache [size: {}, threshold {}], [count {}, 
threshold: {}]",
+                    globalAuditMemory.get(), maxGlobalAuditMemory,
+                    failedDataMap.size(), auditConfig.getMaxCacheRow());
+
             writeLocalFile();
+
             failedDataMap.clear();
         }
+
+        globalAuditMemory.addAndGet(-failedDataSize);
     }
 
     /**
@@ -255,10 +273,9 @@ public class SenderManager {
                     .readObject();
 
             for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
-                if (failedDataMap.size() < (auditConfig.getMaxCacheRow() / 2)) 
{
-                    failedDataMap.putIfAbsent(entry.getKey(), 
entry.getValue());
+                if (!sendData(entry.getValue().getDataByte())) {
+                    LOGGER.error("Local file recovery failed: {}", 
entry.getValue());
                 }
-                sendData(entry.getValue().getDataByte());
                 sleep();
             }
         } catch (IOException | ClassNotFoundException e) {
@@ -294,4 +311,12 @@ public class SenderManager {
     public void setAuditConfig(AuditConfig config) {
         auditConfig = config;
     }
+
+    private long getFailedDataSize() {
+        long dataSize = 0;
+        for (AuditData auditData : failedDataMap.values()) {
+            dataSize += auditData.getDataByte().length;
+        }
+        return dataSize;
+    }
 }

Reply via email to