This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 18b0bf686f [INLONG-9904][Audit] SDK support checkpoint feature  for 
Flink job (#9905)
18b0bf686f is described below

commit 18b0bf686fe4339f4c9824c3e0beae202ae73e45
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Mon Apr 1 23:51:11 2024 +0800

    [INLONG-9904][Audit] SDK support checkpoint feature  for Flink job (#9905)
---
 .../inlong/agent/metrics/audit/AuditUtils.java     |   2 +-
 .../org/apache/inlong/audit/AuditReporterImpl.java | 296 ++++++++++++++++-----
 .../apache/inlong/audit/send/SenderManager.java    |  24 +-
 .../apache/inlong/audit/util/AuditDimensions.java  |  99 +++++++
 .../org/apache/inlong/audit/util/AuditValues.java  |  58 ++++
 inlong-audit/sql/apache_inlong_audit.sql           |   1 +
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |   2 +-
 .../sort/standalone/metrics/audit/AuditUtils.java  |   2 +-
 .../inlong/sort/base/metric/SinkMetricData.java    |   2 +-
 .../inlong/sort/base/metric/SourceMetricData.java  |   2 +-
 .../server/broker/stats/audit/AuditUtils.java      |   2 +-
 11 files changed, 402 insertions(+), 88 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index fc6f4d90c3..e4cc7691c8 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -107,6 +107,6 @@ public class AuditUtils {
         if (!IS_AUDIT) {
             return;
         }
-        AuditOperator.getInstance().send();
+        AuditOperator.getInstance().flush();
     }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index 10c3913dd2..f0f44207d5 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -21,6 +21,8 @@ import org.apache.inlong.audit.loader.SocketAddressListLoader;
 import org.apache.inlong.audit.protocol.AuditApi;
 import org.apache.inlong.audit.send.SenderManager;
 import org.apache.inlong.audit.util.AuditConfig;
+import org.apache.inlong.audit.util.AuditDimensions;
+import org.apache.inlong.audit.util.AuditValues;
 import org.apache.inlong.audit.util.Config;
 import org.apache.inlong.audit.util.StatInfo;
 
@@ -33,6 +35,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
@@ -40,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
@@ -54,20 +58,49 @@ public class AuditReporterImpl implements Serializable {
     private static final int BATCH_NUM = 100;
     private final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
     private static final int PERIOD = 1000 * 60;
-    private final ConcurrentHashMap<String, StatInfo> countMap = new 
ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, StatInfo> threadCountMap = new 
ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new 
ConcurrentHashMap<>();
-    private final List<String> deleteKeyList = new ArrayList<>();
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> 
preStatMap =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> 
summaryStatMap =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> 
expiredStatMap =
+            new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, List<String>> expiredKeyList = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, Long> flushTime = new 
ConcurrentHashMap<>();
     private final Config config = new Config();
     private int packageId = 1;
     private int dataId = 0;
     private boolean initialized = false;
     private SenderManager manager;
+    private AtomicInteger flushStat = new AtomicInteger(0);
 
     private final ScheduledExecutorService timeoutExecutor = 
Executors.newSingleThreadScheduledExecutor();
     private AuditConfig auditConfig = null;
     private SocketAddressListLoader loader = null;
 
+    // Resource isolation key is used in checkpoint and other 
scenarios.DEFAULT 0.
+    private static final long DEFAULT_ISOLATE_KEY = 0;
+    private int flushStatThreshold = 100;
+    private boolean autoFlush = true;
+
+    /**
+     * Set stat threshold
+     *
+     * @param flushStatThreshold
+     */
+    public void setFlushStatThreshold(int flushStatThreshold) {
+        this.flushStatThreshold = flushStatThreshold;
+    }
+
+    /**
+     * When the caller needs to isolate resources, please call this method and 
pass the parameter true.
+     * For example, in scenarios such as flink checkpoint
+     *
+     * @param autoFlush
+     */
+    public void setAutoFlush(boolean autoFlush) {
+        this.autoFlush = autoFlush;
+    }
+
     /**
      * Init
      */
@@ -82,7 +115,10 @@ public class AuditReporterImpl implements Serializable {
             public void run() {
                 try {
                     loadIpPortList();
-                    send();
+                    if (autoFlush) {
+                        flush(DEFAULT_ISOLATE_KEY);
+                    }
+                    checkFlushTime();
                 } catch (Exception e) {
                     LOGGER.error(e.getMessage());
                 }
@@ -198,50 +234,208 @@ public class AuditReporterImpl implements Serializable {
         keyJoiner.add(String.valueOf(auditID));
         keyJoiner.add(auditTag);
         keyJoiner.add(String.valueOf(auditVersion));
-        addByKey(keyJoiner.toString(), count, size, delayTime);
+        addByKey(DEFAULT_ISOLATE_KEY, keyJoiner.toString(), count, size, 
delayTime);
+    }
+
+    /**
+     * When the caller needs to isolate resources, please call this method.
+     * For example, in scenarios such as flink checkpoint
+     *
+     * @param dimensions
+     * @param values
+     */
+    public void add(AuditDimensions dimensions, AuditValues values) {
+        StringJoiner keyJoiner = new StringJoiner(FIELD_SEPARATORS);
+        keyJoiner.add(String.valueOf(dimensions.getLogTime() / PERIOD));
+        keyJoiner.add(dimensions.getInlongGroupID());
+        keyJoiner.add(dimensions.getInlongStreamID());
+        keyJoiner.add(String.valueOf(dimensions.getAuditID()));
+        keyJoiner.add(dimensions.getAuditTag());
+        keyJoiner.add(String.valueOf(dimensions.getAuditVersion()));
+        addByKey(dimensions.getIsolateKey(), keyJoiner.toString(), 
values.getCount(),
+                values.getSize(), values.getDelayTime());
     }
 
     /**
      * Add audit info by key.
      */
-    private void addByKey(String key, long count, long size, long delayTime) {
-        if (countMap.get(key) == null) {
-            countMap.put(key, new StatInfo(0L, 0L, 0L));
+    private void addByKey(long isolateKey, String statKey, long count, long 
size, long delayTime) {
+        if (null == this.preStatMap.get(isolateKey)) {
+            this.preStatMap.putIfAbsent(isolateKey, new ConcurrentHashMap<>());
+        }
+        ConcurrentHashMap<String, StatInfo> statMap = 
this.preStatMap.get(isolateKey);
+        if (null == statMap.get(statKey)) {
+            statMap.putIfAbsent(statKey, new StatInfo(0L, 0L, 0L));
         }
-        countMap.get(key).count.addAndGet(count);
-        countMap.get(key).size.addAndGet(size);
-        countMap.get(key).delay.addAndGet(delayTime);
+        StatInfo stat = statMap.get(statKey);
+        stat.count.addAndGet(count);
+        stat.size.addAndGet(size);
+        stat.delay.addAndGet(delayTime);
+    }
+
+    /**
+     * Flush audit data by default audit version
+     */
+    public synchronized void flush() {
+        flush(DEFAULT_AUDIT_VERSION);
     }
 
     /**
-     * Send audit data
+     * Flush audit data
      */
-    public synchronized void send() {
+    public synchronized void flush(long isolateKey) {
+        if (flushTime.putIfAbsent(isolateKey, System.currentTimeMillis()) != 
null
+                || flushStat.addAndGet(1) > flushStatThreshold) {
+            return;
+        }
+        LOGGER.info("Audit flush isolate key {} ", isolateKey);
         manager.clearBuffer();
         resetStat();
-        // Retrieve statistics from the list of objects without statistics to 
be eliminated
-        for (Map.Entry<String, StatInfo> entry : 
this.deleteCountMap.entrySet()) {
-            this.sumThreadGroup(entry.getKey(), entry.getValue());
+        LOGGER.info("pre stat map size {} {} {} {}", this.preStatMap.size(), 
this.expiredStatMap.size(),
+                this.summaryStatMap.size(), this.expiredKeyList.size());
+
+        summaryExpiredStatMap(isolateKey);
+
+        Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> 
iterator = this.preStatMap.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = 
iterator.next();
+            if (entry.getValue().isEmpty()) {
+                LOGGER.info("Remove the key of pre stat map: {},isolate key: 
{} ", entry.getKey(), isolateKey);
+                iterator.remove();
+                continue;
+            }
+            if (entry.getKey() > isolateKey) {
+                continue;
+            }
+            summaryPreStatMap(entry.getKey(), entry.getValue());
+            send(entry.getKey());
+
+        }
+
+        clearExpiredKey(isolateKey);
+
+        LOGGER.info("Finish report audit data");
+    }
+
+    /**
+     * Send base command
+     */
+    private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
+        AuditApi.BaseCommand.Builder baseCommand = 
AuditApi.BaseCommand.newBuilder();
+        
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
+        manager.send(baseCommand.build(), auditRequest);
+    }
+
+    /**
+     * Summary
+     */
+    private void sumThreadGroup(long isolateKey, String key, StatInfo 
statInfo) {
+        long count = statInfo.count.getAndSet(0);
+        if (0 == count) {
+            return;
+        }
+        ConcurrentHashMap<String, StatInfo> sumMap =
+                this.summaryStatMap.computeIfAbsent(isolateKey, k -> new 
ConcurrentHashMap<>());
+        StatInfo stat = sumMap.computeIfAbsent(key, k -> new StatInfo(0L, 0L, 
0L));
+        stat.count.addAndGet(count);
+        stat.size.addAndGet(statInfo.size.getAndSet(0));
+        stat.delay.addAndGet(statInfo.delay.getAndSet(0));
+    }
+
+    /**
+     * Reset statistics
+     */
+    private void resetStat() {
+        dataId = 0;
+        packageId = 1;
+    }
+
+    /**
+     * Summary expired stat map
+     */
+    private void summaryExpiredStatMap(long isolateKey) {
+        Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> 
iterator =
+                this.expiredStatMap.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> entry = 
iterator.next();
+            if (entry.getValue().isEmpty()) {
+                LOGGER.info("Remove the key of expired stat map: {},isolate 
key: {} ", entry.getKey(), isolateKey);
+                iterator.remove();
+                continue;
+            }
+            if (entry.getKey() > isolateKey) {
+                continue;
+            }
+            for (Map.Entry<String, StatInfo> statInfo : 
entry.getValue().entrySet()) {
+                this.sumThreadGroup(isolateKey, statInfo.getKey(), 
statInfo.getValue());
+            }
+            entry.getValue().clear();
         }
-        this.deleteCountMap.clear();
-        for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) {
+    }
+
+    /**
+     * Summary pre stat map
+     */
+    private void summaryPreStatMap(long isolateKey, ConcurrentHashMap<String, 
StatInfo> statInfo) {
+        List<String> expiredKeys = 
this.expiredKeyList.computeIfAbsent(isolateKey, k -> new ArrayList<>());
+
+        for (Map.Entry<String, StatInfo> entry : statInfo.entrySet()) {
             String key = entry.getKey();
             StatInfo value = entry.getValue();
             // If there is no data, enter the list to be eliminated
             if (value.count.get() == 0) {
-                this.deleteKeyList.add(key);
+                if (!expiredKeys.contains(key)) {
+                    expiredKeys.add(key);
+                }
                 continue;
             }
-            this.sumThreadGroup(key, value);
+            sumThreadGroup(isolateKey, key, value);
         }
+    }
 
-        // Clean up obsolete statistical data objects
-        for (String key : this.deleteKeyList) {
-            StatInfo value = this.countMap.remove(key);
-            this.deleteCountMap.put(key, value);
+    /**
+     * Clear expired key
+     */
+    private void clearExpiredKey(long isolateKey) {
+        Iterator<Map.Entry<Long, List<String>>> iterator =
+                this.expiredKeyList.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Long, List<String>> entry = iterator.next();
+            if (entry.getValue().isEmpty()) {
+                LOGGER.info("Remove the key of expired key list: {},isolate 
key: {}", entry.getKey(), isolateKey);
+                iterator.remove();
+                continue;
+            }
+            if (entry.getKey() > isolateKey) {
+                continue;
+            }
+
+            ConcurrentHashMap<String, StatInfo> preStatInfo = 
this.preStatMap.get(entry.getKey());
+            if (null == preStatInfo) {
+                iterator.remove();
+                continue;
+            }
+            ConcurrentHashMap<String, StatInfo> deleteMap =
+                    this.expiredStatMap.computeIfAbsent(entry.getKey(), k -> 
new ConcurrentHashMap<>());
+            for (String key : entry.getValue()) {
+                StatInfo value = preStatInfo.remove(key);
+                deleteMap.put(key, value);
+            }
+            entry.getValue().clear();
         }
-        this.deleteKeyList.clear();
+    }
 
+    /**
+     * Send Audit data
+     */
+    private void send(long isolateKey) {
+        if (null == summaryStatMap.get(isolateKey)) {
+            return;
+        }
+        if (summaryStatMap.get(isolateKey).isEmpty()) {
+            summaryStatMap.remove(isolateKey);
+            return;
+        }
         long sdkTime = Calendar.getInstance().getTimeInMillis();
         AuditApi.AuditMessageHeader msgHeader = 
AuditApi.AuditMessageHeader.newBuilder()
                 .setIp(config.getLocalIP()).setDockerId(config.getDockerId())
@@ -250,9 +444,8 @@ public class AuditReporterImpl implements Serializable {
                 .build();
         AuditApi.AuditRequest.Builder requestBuild = 
AuditApi.AuditRequest.newBuilder();
         
requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
-
         // Process the stat info for all threads
-        for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
+        for (Map.Entry<String, StatInfo> entry : 
summaryStatMap.get(isolateKey).entrySet()) {
             // Entry key order: logTime inlongGroupID inlongStreamID auditID 
auditTag auditVersion
             String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
             long logTime = Long.parseLong(keyArray[0]) * PERIOD;
@@ -282,49 +475,24 @@ public class AuditReporterImpl implements Serializable {
                 requestBuild.clearMsgBody();
             }
         }
+
         if (requestBuild.getMsgBodyCount() > 0) {
             sendByBaseCommand(requestBuild.build());
             requestBuild.clearMsgBody();
         }
-        threadCountMap.clear();
-
-        LOGGER.info("Finish report audit data");
-    }
-
-    /**
-     * Send base command
-     */
-    private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
-        AuditApi.BaseCommand.Builder baseCommand = 
AuditApi.BaseCommand.newBuilder();
-        
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
-        manager.send(baseCommand.build(), auditRequest);
+        summaryStatMap.get(isolateKey).clear();
     }
 
     /**
-     * Summary
+     * Check flush time
      */
-    private void sumThreadGroup(String key, StatInfo statInfo) {
-        long count = statInfo.count.getAndSet(0);
-        if (0 == count) {
-            return;
-        }
-        if (threadCountMap.get(key) == null) {
-            threadCountMap.put(key, new StatInfo(0, 0, 0));
-        }
-
-        long size = statInfo.size.getAndSet(0);
-        long delay = statInfo.delay.getAndSet(0);
-
-        threadCountMap.get(key).count.addAndGet(count);
-        threadCountMap.get(key).size.addAndGet(size);
-        threadCountMap.get(key).delay.addAndGet(delay);
-    }
-
-    /**
-     * Reset statistics
-     */
-    private void resetStat() {
-        dataId = 0;
-        packageId = 1;
+    private void checkFlushTime() {
+        flushStat.set(0);
+        long currentTime = Calendar.getInstance().getTimeInMillis();
+        flushTime.forEach((key, value) -> {
+            if ((currentTime - value) > PERIOD) {
+                flushTime.remove(key);
+            }
+        });
     }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index f3a573c4f9..7b886021a1 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -139,24 +139,7 @@ public class SenderManager {
         // cache first
         Long requestId = baseCommand.getAuditRequest().getRequestId();
         this.dataMap.putIfAbsent(requestId, data);
-        requestIdQueue.offer(requestId);
         this.sendData(data.getDataByte());
-        // resend
-        long newTime = System.currentTimeMillis() - 10000;
-        if (newTime > lastCheckTime) {
-            for (int i = 0; i < requestIdQueue.size(); i++) {
-                Long current = requestIdQueue.poll();
-                AuditData auditData = this.dataMap.get(current);
-                if (auditData == null) {
-                    continue;
-                } else {
-                    requestIdQueue.offer(current);
-                    if (newTime > auditData.getSendTime()) {
-                        this.sendData(auditData.getDataByte());
-                    }
-                }
-            }
-        }
     }
 
     /**
@@ -179,7 +162,7 @@ public class SenderManager {
      * Clean up the backlog of unsent message packets
      */
     public void clearBuffer() {
-        LOG.info("audit failed cache size: {}", this.dataMap.size());
+        LOG.info("Audit failed cache size: {}", this.dataMap.size());
         for (AuditData data : this.dataMap.values()) {
             this.sendData(data.getDataByte());
             this.sleep();
@@ -286,6 +269,11 @@ public class SenderManager {
             if (data == null) {
                 LOG.error("Can not find the request id onMessageReceived 
{},message: {}",
                         requestId, baseCommand.getAuditReply().getMessage());
+                if (LOG.isDebugEnabled()) {
+                    for (Map.Entry<Long, AuditData> entry : 
this.dataMap.entrySet()) {
+                        LOG.debug("Data map key:{},request id:{}", 
entry.getKey(), requestId);
+                    }
+                }
                 return;
             }
             // check resp
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java
new file mode 100644
index 0000000000..9a8741a6a9
--- /dev/null
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditDimensions.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+/**
+ * Audit dimensions
+ */
+public class AuditDimensions {
+
+    private int auditID;
+    private long logTime;
+    private long auditVersion;
+    private long isolateKey;
+    private String auditTag;
+    private String inlongGroupID;
+    private String inlongStreamID;
+
+    public AuditDimensions(int auditID, long logTime, long auditVersion, long 
isolateKey, String auditTag,
+            String inlongGroupID, String inlongStreamID) {
+        this.auditID = auditID;
+        this.logTime = logTime;
+        this.auditVersion = auditVersion;
+        this.isolateKey = isolateKey;
+        this.auditTag = auditTag;
+        this.inlongGroupID = inlongGroupID;
+        this.inlongStreamID = inlongStreamID;
+    }
+
+    public int getAuditID() {
+        return auditID;
+    }
+
+    public void setAuditID(int auditID) {
+        this.auditID = auditID;
+    }
+
+    public long getLogTime() {
+        return logTime;
+    }
+
+    public void setLogTime(long logTime) {
+        this.logTime = logTime;
+    }
+
+    public long getAuditVersion() {
+        return auditVersion;
+    }
+
+    public void setAuditVersion(long auditVersion) {
+        this.auditVersion = auditVersion;
+    }
+
+    public long getIsolateKey() {
+        return isolateKey;
+    }
+
+    public void setIsolateKey(long isolateKey) {
+        this.isolateKey = isolateKey;
+    }
+
+    public String getAuditTag() {
+        return auditTag;
+    }
+
+    public void setAuditTag(String auditTag) {
+        this.auditTag = auditTag;
+    }
+
+    public String getInlongGroupID() {
+        return inlongGroupID;
+    }
+
+    public void setInlongGroupID(String inlongGroupID) {
+        this.inlongGroupID = inlongGroupID;
+    }
+
+    public String getInlongStreamID() {
+        return inlongStreamID;
+    }
+
+    public void setInlongStreamID(String inlongStreamID) {
+        this.inlongStreamID = inlongStreamID;
+    }
+}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java
new file mode 100644
index 0000000000..d5b74856bd
--- /dev/null
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditValues.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.util;
+
+/**
+ * Audit values
+ */
+public class AuditValues {
+
+    private long count;
+    private long size;
+    private long delayTime;
+
+    public AuditValues(long count, long size, long delayTime) {
+        this.count = count;
+        this.size = size;
+        this.delayTime = delayTime;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public void setCount(long count) {
+        this.count = count;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public void setSize(long size) {
+        this.size = size;
+    }
+
+    public long getDelayTime() {
+        return delayTime;
+    }
+
+    public void setDelayTime(long delayTime) {
+        this.delayTime = delayTime;
+    }
+}
diff --git a/inlong-audit/sql/apache_inlong_audit.sql 
b/inlong-audit/sql/apache_inlong_audit.sql
index 2d10ca697c..a2b007534a 100644
--- a/inlong-audit/sql/apache_inlong_audit.sql
+++ b/inlong-audit/sql/apache_inlong_audit.sql
@@ -43,6 +43,7 @@ CREATE TABLE IF NOT EXISTS `audit_data`
     `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target 
inlong stream id',
     `audit_id`         varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
     `audit_tag`        varchar(100) DEFAULT '' COMMENT 'Audit tag',
+    `audit_version`    BIGINT       DEFAULT -1  COMMENT 'Audit version',
     `count`            BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
count',
     `size`             BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
size',
     `delay`            BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message 
delay count',
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 7bd27521c6..371a4de0c9 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -123,6 +123,6 @@ public class AuditUtils {
      * Send audit data
      */
     public static void send() {
-        AuditOperator.getInstance().send();
+        AuditOperator.getInstance().flush();
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
index 795d505c1f..f5bd2ebbc2 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
@@ -104,6 +104,6 @@ public class AuditUtils {
      * Send audit data
      */
     public static void send() {
-        AuditOperator.getInstance().send();
+        AuditOperator.getInstance().flush();
     }
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 1b09343b5c..7270becdb7 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -285,7 +285,7 @@ public class SinkMetricData implements MetricData, 
Serializable {
      */
     public void flushAuditData() {
         if (auditOperator != null) {
-            auditOperator.send();
+            auditOperator.flush();
         }
     }
 
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 91abcf22aa..0568efd7c3 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -341,7 +341,7 @@ public class SourceMetricData implements MetricData, 
Serializable {
      */
     public void flushAuditData() {
         if (auditOperator != null) {
-            auditOperator.send();
+            auditOperator.flush();
         }
     }
 
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
index 33b0d9f165..0e0c287739 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
@@ -112,6 +112,6 @@ public class AuditUtils {
         if (!auditConfig.isAuditEnable()) {
             return;
         }
-        AuditOperator.getInstance().send();
+        AuditOperator.getInstance().flush();
     }
 }

Reply via email to