This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c28583ff [INLONG-11743][SDK] Adjustment of metric statistics (#11744)
63c28583ff is described below

commit 63c28583ffe568ae25d21e2e3a860ab44d5926f9
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Feb 11 14:09:13 2025 +0800

    [INLONG-11743][SDK] Adjustment of metric statistics (#11744)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/sdk/dataproxy/metric/MetaSyncInfo.java  |   4 +-
 .../sdk/dataproxy/metric/MetricDataHolder.java     | 216 +++++++++++++++++----
 .../inlong/sdk/dataproxy/metric/TimeCostInfo.java  |   4 +-
 .../inlong/sdk/dataproxy/metric/TrafficInfo.java   | 151 ++++++++++----
 .../sdk/dataproxy/network/http/HttpClientMgr.java  |  55 +++++-
 .../sdk/dataproxy/network/tcp/TcpClientMgr.java    |   2 +-
 .../sdk/dataproxy/network/tcp/TcpNettyClient.java  |   2 +-
 .../inlong/sdk/dataproxy/sender/BaseSender.java    |   4 +-
 .../dataproxy/sender/http/InLongHttpMsgSender.java |  11 +-
 .../dataproxy/sender/tcp/InLongTcpMsgSender.java   |  26 ++-
 10 files changed, 367 insertions(+), 108 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
index 2539c7ae53..b574eff2ac 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
@@ -46,12 +46,12 @@ public class MetaSyncInfo {
 
     public void getAndResetValue(StringBuilder strBuff) {
         if (syncErrInfo.isEmpty()) {
-            strBuff.append("\"mSync\":{\"errT\":{},");
+            strBuff.append("\"ms\":{\"errT\":{},");
             syncCostMs.getAndResetValue(strBuff);
             strBuff.append("}");
         } else {
             long curCnt = 0;
-            strBuff.append("\"mSync\":{\"errT\":{");
+            strBuff.append("\"ms\":{\"errT\":{");
             for (Map.Entry<Integer, LongAdder> entry : syncErrInfo.entrySet()) 
{
                 if (curCnt++ > 0) {
                     strBuff.append(",");
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index 9b8e19560c..4b16f62c8d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -40,7 +40,7 @@ public class MetricDataHolder implements Runnable {
 
     private static final String DEFAULT_KEY_SPLITTER = "#";
     private static final Logger logger = 
LoggerFactory.getLogger(MetricDataHolder.class);
-    private static final LogCounter exceptCnt = new LogCounter(10, 100000, 60 
* 1000L);
+    private static final LogCounter exceptCnt = new LogCounter(5, 100000, 60 * 
1000L);
 
     private final MetricConfig metricConfig;
     private final BaseSender sender;
@@ -95,7 +95,7 @@ public class MetricDataHolder implements Runnable {
     }
 
     public void addMetaSyncMetric(int errCode, long syncCostMs) {
-        if (!this.started || !this.metricConfig.isEnableMetric()) {
+        if (!this.metricConfig.isEnableMetric()) {
             return;
         }
         MetricInfoUnit selectedUnit = metricUnits[itemIndex];
@@ -107,28 +107,56 @@ public class MetricDataHolder implements Runnable {
         }
     }
 
-    public void addSucMetric(String groupId, String streamId, int msgCnt, long 
costMs) {
-        if (!this.started || !this.metricConfig.isEnableMetric()) {
+    public void addSyncSucMetric(String groupId, String streamId, int msgCnt, 
long costMs) {
+        if (!this.metricConfig.isEnableMetric()) {
             return;
         }
         MetricInfoUnit selectedUnit = metricUnits[itemIndex];
         selectedUnit.refCnt.incrementAndGet();
         try {
-            selectedUnit.addSucMsgInfo(groupId,
+            selectedUnit.addSyncSendSucInfo(groupId,
                     (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt, costMs);
         } finally {
             selectedUnit.refCnt.decrementAndGet();
         }
     }
 
-    public void addFailMetric(int errCode, String groupId, String streamId, 
int msgCnt) {
-        if (!this.started || !this.metricConfig.isEnableMetric()) {
+    public void addSyncFailMetric(int errCode, String groupId, String 
streamId, int msgCnt) {
+        if (!this.metricConfig.isEnableMetric()) {
             return;
         }
         MetricInfoUnit selectedUnit = metricUnits[itemIndex];
         selectedUnit.refCnt.incrementAndGet();
         try {
-            selectedUnit.addFailMsgInfo(groupId,
+            selectedUnit.addSyncSendFailInfo(groupId,
+                    (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt, errCode);
+        } finally {
+            selectedUnit.refCnt.decrementAndGet();
+        }
+    }
+
+    public void addAsyncSucReqMetric(String groupId, String streamId, int 
msgCnt) {
+        if (!this.metricConfig.isEnableMetric()) {
+            return;
+        }
+        MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+        selectedUnit.refCnt.incrementAndGet();
+        try {
+            selectedUnit.addAsyncSendSucInfo(groupId,
+                    (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt);
+        } finally {
+            selectedUnit.refCnt.decrementAndGet();
+        }
+    }
+
+    public void addAsyncFailReqMetric(int errCode, String groupId, String 
streamId, int msgCnt) {
+        if (!this.metricConfig.isEnableMetric()) {
+            return;
+        }
+        MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+        selectedUnit.refCnt.incrementAndGet();
+        try {
+            selectedUnit.addAsyncSendFailInfo(groupId,
                     (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt, errCode);
         } finally {
             selectedUnit.refCnt.decrementAndGet();
@@ -136,13 +164,13 @@ public class MetricDataHolder implements Runnable {
     }
 
     public void addCallbackSucMetric(String groupId, String streamId, int 
msgCnt, long costMs, long callDurMs) {
-        if (!this.started || !this.metricConfig.isEnableMetric()) {
+        if (!this.metricConfig.isEnableMetric()) {
             return;
         }
         MetricInfoUnit selectedUnit = metricUnits[itemIndex];
         selectedUnit.refCnt.incrementAndGet();
         try {
-            selectedUnit.addSucMsgInfo(groupId,
+            selectedUnit.addAsyncRspSucInfo(groupId,
                     (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt, costMs, callDurMs);
         } finally {
             selectedUnit.refCnt.decrementAndGet();
@@ -150,19 +178,61 @@ public class MetricDataHolder implements Runnable {
     }
 
     public void addCallbackFailMetric(int errCode, String groupId, String 
streamId, int msgCnt, long costMs) {
-        if (!this.started || !this.metricConfig.isEnableMetric()) {
+        if (!this.metricConfig.isEnableMetric()) {
             return;
         }
         MetricInfoUnit selectedUnit = metricUnits[itemIndex];
         selectedUnit.refCnt.incrementAndGet();
         try {
-            selectedUnit.addFailMsgInfo(groupId,
+            selectedUnit.addAsyncRspFailInfo(groupId,
                     (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt, errCode, costMs);
         } finally {
             selectedUnit.refCnt.decrementAndGet();
         }
     }
 
+    public void addAsyncHttpSucPutMetric(String groupId, String streamId, int 
msgCnt) {
+        if (!this.metricConfig.isEnableMetric()) {
+            return;
+        }
+        MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+        selectedUnit.refCnt.incrementAndGet();
+        try {
+            selectedUnit.addAsyncHttpPutSucInfo(groupId,
+                    (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt);
+        } finally {
+            selectedUnit.refCnt.decrementAndGet();
+        }
+    }
+
+    public void addAsyncHttpFailPutMetric(int errCode, String groupId, String 
streamId, int msgCnt) {
+        if (!this.metricConfig.isEnableMetric()) {
+            return;
+        }
+        MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+        selectedUnit.refCnt.incrementAndGet();
+        try {
+            selectedUnit.addAsyncHttpPutFailInfo(groupId,
+                    (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt, errCode);
+        } finally {
+            selectedUnit.refCnt.decrementAndGet();
+        }
+    }
+
+    public void addAsyncHttpSucGetMetric(String groupId, String streamId, int 
msgCnt) {
+        if (!this.metricConfig.isEnableMetric()) {
+            return;
+        }
+        MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+        selectedUnit.refCnt.incrementAndGet();
+        try {
+            selectedUnit.addAsyncHttpGetSucInfo(groupId,
+                    (this.metricConfig.isMaskStreamId() ? "" : streamId), 
msgCnt);
+        } finally {
+            selectedUnit.refCnt.decrementAndGet();
+        }
+    }
+
     private void outputMetricData(boolean forceOutput, long reportTime, int 
readIndex) {
         if (!this.metricConfig.isEnableMetric()) {
             return;
@@ -180,11 +250,7 @@ public class MetricDataHolder implements Runnable {
                     || (System.currentTimeMillis() - startTime >= 5000L)) {
                 break;
             }
-            try {
-                Thread.sleep(3);
-            } catch (InterruptedException e) {
-                break;
-            }
+            ProxyUtils.sleepSomeTime(80);
         } while (selectedUnit.refCnt.get() > 0);
         if (!forceOutput && !this.started) {
             logger.info("Metric DataHolder({}) closed, stop output metric 
info",
@@ -218,7 +284,47 @@ public class MetricDataHolder implements Runnable {
         protected final ConcurrentHashMap<String, TrafficInfo> trafficMap = 
new ConcurrentHashMap<>();
         protected final ConcurrentHashMap<Integer, LongAdder> errCodeMap = new 
ConcurrentHashMap<>();
 
-        public void addSucMsgInfo(String groupId, String streamId, int msgCnt, 
long costMs) {
+        public void addSyncSendSucInfo(String groupId, String streamId, int 
msgCnt, long costMs) {
+            String recordKey = getKeyStringByConfig(groupId, streamId);
+            TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+            if (trafficInfo == null) {
+                TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+                trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+                if (trafficInfo == null) {
+                    trafficInfo = tmpInfo;
+                }
+            }
+            trafficInfo.addSyncSucMsgInfo(msgCnt, costMs);
+        }
+
+        public void addSyncSendFailInfo(String groupId, String streamId, int 
msgCnt, int errCode) {
+            String recordKey = getKeyStringByConfig(groupId, streamId);
+            TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+            if (trafficInfo == null) {
+                TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+                trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+                if (trafficInfo == null) {
+                    trafficInfo = tmpInfo;
+                }
+            }
+            trafficInfo.addSyncFailMsgInfo(msgCnt);
+            addSendErrCodeInfo(errCode);
+        }
+
+        public void addAsyncSendSucInfo(String groupId, String streamId, int 
msgCnt) {
+            String recordKey = getKeyStringByConfig(groupId, streamId);
+            TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+            if (trafficInfo == null) {
+                TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+                trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+                if (trafficInfo == null) {
+                    trafficInfo = tmpInfo;
+                }
+            }
+            trafficInfo.addAsyncSucSendInfo(msgCnt);
+        }
+
+        public void addAsyncSendFailInfo(String groupId, String streamId, int 
msgCnt, int errCode) {
             String recordKey = getKeyStringByConfig(groupId, streamId);
             TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
             if (trafficInfo == null) {
@@ -228,10 +334,11 @@ public class MetricDataHolder implements Runnable {
                     trafficInfo = tmpInfo;
                 }
             }
-            trafficInfo.addSucMsgInfo(msgCnt, costMs);
+            trafficInfo.addAsyncFailSendInfo(msgCnt);
+            addSendErrCodeInfo(errCode);
         }
 
-        public void addSucMsgInfo(String groupId, String streamId, int msgCnt, 
long sdCostMs, long cbCostMs) {
+        public void addAsyncHttpPutSucInfo(String groupId, String streamId, 
int msgCnt) {
             String recordKey = getKeyStringByConfig(groupId, streamId);
             TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
             if (trafficInfo == null) {
@@ -241,10 +348,10 @@ public class MetricDataHolder implements Runnable {
                     trafficInfo = tmpInfo;
                 }
             }
-            trafficInfo.addSucMsgInfo(msgCnt, sdCostMs, cbCostMs);
+            trafficInfo.addAsyncHttpSucPutInfo(msgCnt);
         }
 
-        public void addFailMsgInfo(String groupId, String streamId, int 
msgCnt, int errCode) {
+        public void addAsyncHttpPutFailInfo(String groupId, String streamId, 
int msgCnt, int errCode) {
             String recordKey = getKeyStringByConfig(groupId, streamId);
             TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
             if (trafficInfo == null) {
@@ -254,12 +361,37 @@ public class MetricDataHolder implements Runnable {
                     trafficInfo = tmpInfo;
                 }
             }
-            trafficInfo.addFailMsgInfo(msgCnt);
+            trafficInfo.addAsyncHttpFailPutInfo(msgCnt);
             addSendErrCodeInfo(errCode);
         }
 
-        public void addFailMsgInfo(String groupId, String streamId,
-                int msgCnt, int errCode, long cbCostMs) {
+        public void addAsyncHttpGetSucInfo(String groupId, String streamId, 
int msgCnt) {
+            String recordKey = getKeyStringByConfig(groupId, streamId);
+            TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+            if (trafficInfo == null) {
+                TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+                trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+                if (trafficInfo == null) {
+                    trafficInfo = tmpInfo;
+                }
+            }
+            trafficInfo.addAsyncHttpSucGetInfo(msgCnt);
+        }
+
+        public void addAsyncRspSucInfo(String groupId, String streamId, int 
msgCnt, long sdCostMs, long cbCostMs) {
+            String recordKey = getKeyStringByConfig(groupId, streamId);
+            TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+            if (trafficInfo == null) {
+                TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+                trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+                if (trafficInfo == null) {
+                    trafficInfo = tmpInfo;
+                }
+            }
+            trafficInfo.addAsyncSucRspInfo(msgCnt, sdCostMs, cbCostMs);
+        }
+
+        public void addAsyncRspFailInfo(String groupId, String streamId, int 
msgCnt, int errCode, long cbCostMs) {
             String recordKey = getKeyStringByConfig(groupId, streamId);
             TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
             if (trafficInfo == null) {
@@ -269,11 +401,11 @@ public class MetricDataHolder implements Runnable {
                     trafficInfo = tmpInfo;
                 }
             }
-            trafficInfo.addFailMsgInfo(msgCnt, cbCostMs);
+            trafficInfo.addAsyncFailRspInfo(msgCnt, cbCostMs);
             addSendErrCodeInfo(errCode);
         }
 
-        public void addSendErrCodeInfo(int errCode) {
+        private void addSendErrCodeInfo(int errCode) {
             LongAdder longCount = this.errCodeMap.get(errCode);
             if (longCount == null) {
                 LongAdder tmpCount = new LongAdder();
@@ -288,7 +420,7 @@ public class MetricDataHolder implements Runnable {
         public void getAndResetValue(StringBuilder strBuff) {
             int count = 0;
             metaSyncInfo.getAndResetValue(strBuff);
-            strBuff.append(",\"m\":[");
+            strBuff.append(",\"tr\":[");
             for (Map.Entry<String, TrafficInfo> entry : trafficMap.entrySet()) 
{
                 if (count++ > 0) {
                     strBuff.append(",");
@@ -325,9 +457,9 @@ public class MetricDataHolder implements Runnable {
                 .append(",\"lrT\":").append(lstReportTime)
                 .append(",");
         metricUnit.getAndResetValue(strBuff);
-        strBuff.append(",\"s\":{\"tNodes\":").append(sender.getProxyNodeCnt())
-                .append(",\"aNodes\":").append(sender.getActiveNodeCnt())
-                .append(",\"ifReqs\":").append(sender.getInflightMsgCnt())
+        strBuff.append(",\"s\":{\"tNs\":").append(sender.getProxyNodeCnt())
+                .append(",\"aNs\":").append(sender.getActiveNodeCnt())
+                .append(",\"ifRs\":").append(sender.getInflightMsgCnt())
                 
.append("},\"c\":{\"aC\":").append(sender.getConfigure().getAliveConnections())
                 
.append(",\"rP\":\"").append(sender.getConfigure().getDataRptProtocol())
                 
.append("\",\"rG\":\"").append(sender.getConfigure().getRegionName())
@@ -335,22 +467,22 @@ public class MetricDataHolder implements Runnable {
         if (sender instanceof TcpMsgSender) {
             TcpMsgSenderConfig tcpConfig = (TcpMsgSenderConfig) 
sender.getConfigure();
             
strBuff.append(",\"mT\":").append(tcpConfig.getSdkMsgType().getValue())
-                    
.append(",\"comp\":").append(tcpConfig.isEnableDataCompress())
-                    
.append(",\"mCLen\":").append(tcpConfig.getMinCompEnableLength())
-                    
.append(",\"lfSep\":").append(tcpConfig.isSeparateEventByLF())
+                    
.append(",\"cp\":").append(tcpConfig.isEnableDataCompress())
+                    
.append(",\"mCp\":").append(tcpConfig.getMinCompEnableLength())
+                    .append(",\"lf\":").append(tcpConfig.isSeparateEventByLF())
                     
.append(",\"nWk\":").append(tcpConfig.getNettyWorkerThreadNum())
                     .append(",\"sB\":").append(tcpConfig.getSendBufferSize())
                     .append(",\"rB\":").append(tcpConfig.getRcvBufferSize())
-                    
.append(",\"cOut\":").append(tcpConfig.getConnectTimeoutMs())
-                    
.append(",\"rOut\":").append(tcpConfig.getRequestTimeoutMs())
-                    
.append(",\"syncOut\":").append(tcpConfig.getMaxAllowedSyncMsgTimeoutCnt());
+                    
.append(",\"cOt\":").append(tcpConfig.getConnectTimeoutMs())
+                    
.append(",\"rOt\":").append(tcpConfig.getRequestTimeoutMs())
+                    
.append(",\"syOt\":").append(tcpConfig.getMaxAllowedSyncMsgTimeoutCnt());
         } else {
             HttpMsgSenderConfig httpConfig = (HttpMsgSenderConfig) 
sender.getConfigure();
-            
strBuff.append(",\"iHttps\":").append(httpConfig.isRptDataByHttps())
-                    
.append(",\"sOut\":").append(httpConfig.getHttpSocketTimeoutMs())
-                    
.append(",\"cOut\":").append(httpConfig.getHttpConTimeoutMs())
-                    
.append(",\"asyWk\":").append(httpConfig.getHttpAsyncRptWorkerNum())
-                    
.append(",\"asyCh\":").append(httpConfig.getHttpAsyncRptCacheSize());
+            strBuff.append(",\"iHs\":").append(httpConfig.isRptDataByHttps())
+                    
.append(",\"sOt\":").append(httpConfig.getHttpSocketTimeoutMs())
+                    
.append(",\"cOt\":").append(httpConfig.getHttpConTimeoutMs())
+                    
.append(",\"aWk\":").append(httpConfig.getHttpAsyncRptWorkerNum())
+                    
.append(",\"aC\":").append(httpConfig.getHttpAsyncRptCacheSize());
         }
         String content = strBuff.append("}}").toString();
         strBuff.delete(0, strBuff.length());
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
index 1bd33306a1..5a5ad7deb5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
@@ -73,10 +73,10 @@ public class TimeCostInfo {
         long curTotalCnt = totalCnt.sumThenReset();
         if (curTotalCnt == 0) {
             strBuff.append("\"").append(name)
-                    
.append("\":{\"bucketT\":{},\"min\":0,\"max\":0,\"avgT\":0}");
+                    .append("\":{\"bkts\":{},\"min\":0,\"max\":0,\"avgT\":0}");
         } else {
             long bucketCnt = 0;
-            strBuff.append("\"").append(name).append("\":{\"bucketT\":{");
+            strBuff.append("\"").append(name).append("\":{\"bkts\":{");
             for (Map.Entry<String, LongAdder> entry : 
sendTimeBucketT.entrySet()) {
                 if (bucketCnt++ > 0) {
                     strBuff.append(",");
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
index 2622c47cef..759148f4ff 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
@@ -27,16 +27,44 @@ public class TrafficInfo {
     private final String groupId;
     // sId
     private final String streamId;
-    // sPkg
-    private final LongAdder sendPkgCount = new LongAdder();
-    // sMsg
-    private final LongAdder sendMsgCount = new LongAdder();
-    // fMsg
-    private final LongAdder failedPgkCount = new LongAdder();
-    // fMsg
-    private final LongAdder failedMsgCount = new LongAdder();
-    // sucMs: send success time cost in Ms
-    private final TimeCostInfo sendCostMs = new TimeCostInfo("sucMs");
+
+    // sPs
+    private final LongAdder syncSendPkgCnt = new LongAdder();
+    // sMs
+    private final LongAdder syncSendMsgCount = new LongAdder();
+    // sPf
+    private final LongAdder syncFailedPgkCount = new LongAdder();
+    // sucMs: sync call time cost in Ms
+    private final TimeCostInfo syncSendCostMs = new TimeCostInfo("sSucMs");
+    // http
+    // apPs
+    private final LongAdder aSyncHttpPkgPutCnt = new LongAdder();
+    // apMs
+    private final LongAdder aSyncHttpMsgPutCnt = new LongAdder();
+    // apPf
+    private final LongAdder aSyncHttpFailPkgPutCnt = new LongAdder();
+    // agPs
+    private final LongAdder aSyncHttpPkgGetCnt = new LongAdder();
+    // agMs
+    private final LongAdder aSyncHttpMsgGetCnt = new LongAdder();
+    // aPs
+    private final LongAdder aSyncSendPkgCount = new LongAdder();
+    // aMs
+    private final LongAdder aSyncSendMsgCount = new LongAdder();
+    // aPf
+    private final LongAdder aSyncFailedPgkCnt = new LongAdder();
+    // arPs
+    private final LongAdder aRcvPkgCount = new LongAdder();
+    // arMs
+    private final LongAdder aRcvMsgCount = new LongAdder();
+    // arPf
+    private final LongAdder aRcvFailedPgkCount = new LongAdder();
+    // arMf
+    private final LongAdder aRcvFailedMsgCount = new LongAdder();
+    // sucMs: async received time cost in Ms
+    private final TimeCostInfo asyncSucCostMs = new TimeCostInfo("aSucMs");
+    // call back call count
+    private final LongAdder cbCallCount = new LongAdder();
     // cbMs: call back time cost in Ms
     private final TimeCostInfo callbackCostMs = new TimeCostInfo("cbMs");
 
@@ -45,27 +73,51 @@ public class TrafficInfo {
         this.streamId = streamId;
     }
 
-    public void addSucMsgInfo(int msgCnt, long costMs) {
-        sendPkgCount.add(1);
-        sendMsgCount.add(msgCnt);
-        sendCostMs.addTimeCostInMs(costMs);
+    public void addSyncSucMsgInfo(int msgCnt, long costMs) {
+        syncSendPkgCnt.increment();
+        syncSendMsgCount.add(msgCnt);
+        syncSendCostMs.addTimeCostInMs(costMs);
     }
 
-    public void addSucMsgInfo(int msgCnt, long sdCostMs, long cbCostMs) {
-        sendPkgCount.add(1);
-        sendMsgCount.add(msgCnt);
-        sendCostMs.addTimeCostInMs(sdCostMs);
-        callbackCostMs.addTimeCostInMs(cbCostMs);
+    public void addSyncFailMsgInfo(int msgCnt) {
+        syncFailedPgkCount.increment();
+    }
+
+    public void addAsyncSucSendInfo(int msgCnt) {
+        aSyncSendPkgCount.increment();
+        aSyncSendMsgCount.add(msgCnt);
     }
 
-    public void addFailMsgInfo(int msgCnt) {
-        failedPgkCount.add(1);
-        failedMsgCount.add(msgCnt);
+    public void addAsyncFailSendInfo(int msgCnt) {
+        aSyncFailedPgkCnt.increment();
+    }
+
+    public void addAsyncHttpSucPutInfo(int msgCnt) {
+        aSyncHttpPkgPutCnt.increment();
+        aSyncHttpMsgPutCnt.add(msgCnt);
+    }
+
+    public void addAsyncHttpFailPutInfo(int msgCnt) {
+        aSyncHttpFailPkgPutCnt.increment();
+    }
+
+    public void addAsyncHttpSucGetInfo(int msgCnt) {
+        aSyncHttpPkgGetCnt.increment();
+        aSyncHttpMsgGetCnt.add(msgCnt);
+    }
+
+    public void addAsyncSucRspInfo(int msgCnt, long sdCostMs, long cbCostMs) {
+        aRcvPkgCount.increment();
+        aRcvMsgCount.add(msgCnt);
+        asyncSucCostMs.addTimeCostInMs(sdCostMs);
+        cbCallCount.increment();
+        callbackCostMs.addTimeCostInMs(cbCostMs);
     }
 
-    public void addFailMsgInfo(int msgCnt, long cbCostMs) {
-        failedPgkCount.add(1);
-        failedMsgCount.add(msgCnt);
+    public void addAsyncFailRspInfo(int msgCnt, long cbCostMs) {
+        aRcvFailedPgkCount.increment();
+        aRcvFailedMsgCount.add(msgCnt);
+        cbCallCount.increment();
         callbackCostMs.addTimeCostInMs(cbCostMs);
     }
 
@@ -74,23 +126,50 @@ public class TrafficInfo {
         if (StringUtils.isNotBlank(this.streamId)) {
             strBuff.append("\",\"sId\":\"").append(this.streamId);
         }
-        strBuff.append("\",\"sPkg\":").append(sendPkgCount.sumThenReset())
-                .append(",\"sMsg\":").append(sendMsgCount.sumThenReset())
-                .append(",\"fPkg\":").append(failedPgkCount.sumThenReset())
-                .append(",\"fMsg\":").append(failedMsgCount.sumThenReset())
-                .append(",");
-        this.sendCostMs.getAndResetValue(strBuff);
+        strBuff.append("\",\"sPs\":").append(syncSendPkgCnt.sumThenReset())
+                .append(",\"sPf\":").append(syncFailedPgkCount.sumThenReset())
+                
.append(",\"sMs\":").append(syncSendMsgCount.sumThenReset()).append(",");
+        this.syncSendCostMs.getAndResetValue(strBuff);
+        strBuff.append(",\"apPs\":").append(aSyncHttpPkgPutCnt.sumThenReset())
+                
.append(",\"apPf\":").append(aSyncHttpFailPkgPutCnt.sumThenReset())
+                .append(",\"agPs\":").append(aSyncHttpPkgGetCnt.sumThenReset())
+                .append(",\"aPs\":").append(aSyncSendPkgCount.sumThenReset())
+                .append(",\"aPf\":").append(aSyncFailedPgkCnt.sumThenReset())
+                .append(",\"arPs\":").append(aRcvPkgCount.sumThenReset())
+                .append(",\"arPf\":").append(aRcvFailedPgkCount.sumThenReset())
+                .append(",\"cbCt\":").append(cbCallCount.sumThenReset())
+                .append(",\"apMs\":").append(aSyncHttpMsgPutCnt.sumThenReset())
+                .append(",\"agMs\":").append(aSyncHttpMsgGetCnt.sumThenReset())
+                .append(",\"aMs\":").append(aSyncSendMsgCount.sumThenReset())
+                .append(",\"arMs\":").append(aRcvMsgCount.sumThenReset())
+                
.append(",\"arMf\":").append(aRcvFailedMsgCount.sumThenReset()).append(",");
+        this.asyncSucCostMs.getAndResetValue(strBuff);
         strBuff.append(",");
         this.callbackCostMs.getAndResetValue(strBuff);
         strBuff.append("}");
     }
 
     public void clear() {
-        this.sendPkgCount.reset();
-        this.sendMsgCount.reset();
-        this.failedPgkCount.reset();
-        this.failedMsgCount.reset();
-        this.sendCostMs.clear();
+        this.syncSendPkgCnt.reset();
+        this.syncSendMsgCount.reset();
+        this.syncFailedPgkCount.reset();
+        this.syncSendCostMs.clear();
+        //
+        this.aSyncSendPkgCount.reset();
+        this.aSyncSendMsgCount.reset();
+        this.aSyncFailedPgkCnt.reset();
+        this.aSyncHttpPkgPutCnt.reset();
+        this.aSyncHttpMsgPutCnt.reset();
+        this.aSyncHttpFailPkgPutCnt.reset();
+        this.aSyncHttpPkgGetCnt.reset();
+        this.aSyncHttpMsgGetCnt.reset();
+        //
+        this.aRcvPkgCount.reset();
+        this.aRcvMsgCount.reset();
+        this.aRcvFailedPgkCount.reset();
+        this.aRcvFailedMsgCount.reset();
+        this.asyncSucCostMs.clear();
+        this.cbCallCount.reset();
         this.callbackCostMs.clear();
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
index bc4d2884c3..1700e394d5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -80,6 +80,7 @@ public class HttpClientMgr implements ClientMgr {
     private final LinkedBlockingQueue<HttpAsyncObj> messageCache;
     private final Semaphore asyncIdleCellCnt;
     private final ExecutorService workerServices = 
Executors.newCachedThreadPool();
+    private volatile boolean existSend = false;
     private final AtomicBoolean shutDown = new AtomicBoolean(false);
     // meta info
     private ConcurrentHashMap<String, HostInfo> usingNodeMaps = new 
ConcurrentHashMap<>();
@@ -126,9 +127,8 @@ public class HttpClientMgr implements ClientMgr {
         long stopTime = System.currentTimeMillis();
         logger.info("ClientMgr({}) is closing...", this.sender.getSenderId());
         if (!messageCache.isEmpty()) {
-            if (httpConfig.isDiscardHttpCacheWhenClosing()) {
-                messageCache.clear();
-            } else {
+            if (!httpConfig.isDiscardHttpCacheWhenClosing()) {
+                // wait last event report
                 long startTime = System.currentTimeMillis();
                 while (!messageCache.isEmpty()) {
                     if (System.currentTimeMillis() - startTime >= 
httpConfig.getHttpCloseWaitPeriodMs()) {
@@ -136,8 +136,43 @@ public class HttpClientMgr implements ClientMgr {
                     }
                     ProxyUtils.sleepSomeTime(100L);
                 }
-                remainCnt = messageCache.size();
-                messageCache.clear();
+            }
+            // force exist report
+            existSend = true;
+            // call back result
+            boolean isSucc;
+            long currentTime;
+            HttpAsyncObj asyncObj;
+            while (!messageCache.isEmpty()) {
+                asyncObj = messageCache.poll();
+                if (asyncObj == null) {
+                    continue;
+                }
+                isSucc = true;
+                currentTime = System.currentTimeMillis();
+                sender.getMetricHolder().addAsyncHttpSucGetMetric(
+                        asyncObj.getHttpEvent().getGroupId(),
+                        asyncObj.getHttpEvent().getStreamId(),
+                        asyncObj.getHttpEvent().getMsgCnt());
+                try {
+                    asyncObj.getCallback().onMessageAck(new 
ProcessResult(ErrorCode.SDK_CLOSED));
+                } catch (Throwable ex) {
+                    isSucc = false;
+                    if (asyncSendExptCnt.shouldPrint()) {
+                        logger.error("HttpAsync({}) callback event exception", 
this.sender.getSenderId(), ex);
+                    }
+                } finally {
+                    asyncIdleCellCnt.release();
+                    if (isSucc) {
+                        
sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(),
+                                asyncObj.getHttpEvent().getStreamId(), 
asyncObj.getHttpEvent().getMsgCnt(),
+                                (currentTime - asyncObj.getRptMs()), 
(System.currentTimeMillis() - currentTime));
+                    } else {
+                        
sender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
+                                asyncObj.getHttpEvent().getGroupId(), 
asyncObj.getHttpEvent().getStreamId(),
+                                asyncObj.getHttpEvent().getMsgCnt(), 
(System.currentTimeMillis() - currentTime));
+                    }
+                }
             }
         }
         workerServices.shutdown();
@@ -443,10 +478,17 @@ public class HttpClientMgr implements ClientMgr {
             // if not shutdown or queue is not empty
             while (!shutDown.get() || !messageCache.isEmpty()) {
                 while (!messageCache.isEmpty()) {
+                    if (existSend) {
+                        break;
+                    }
                     asyncObj = messageCache.poll();
                     if (asyncObj == null) {
                         continue;
                     }
+                    sender.getMetricHolder().addAsyncHttpSucGetMetric(
+                            asyncObj.getHttpEvent().getGroupId(),
+                            asyncObj.getHttpEvent().getStreamId(),
+                            asyncObj.getHttpEvent().getMsgCnt());
                     try {
                         sendMessage(asyncObj.getHttpEvent(), procResult);
                         curTime = System.currentTimeMillis();
@@ -468,6 +510,9 @@ public class HttpClientMgr implements ClientMgr {
                         }
                     }
                 }
+                if (existSend) {
+                    break;
+                }
                 
ProxyUtils.sleepSomeTime(httpConfig.getHttpAsyncWorkerIdleWaitMs());
             }
             logger.info("HttpAsyncReportWorker({}) stopped", this.workerId);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index ec048f54ab..0a9c74b616 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -281,7 +281,7 @@ public class TcpClientMgr implements ClientMgr {
                 }
                 rmvMsgStubInfo(encObject.getMessageId());
             }
-            return procResult.setSuccess();
+            return procResult.isSuccess();
         } else {
             // process sync report
             if (!client.write(clientTerm, encObject, procResult)) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
index 29899e6f58..c521b04cd1 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
@@ -265,6 +265,7 @@ public class TcpNettyClient {
             this.msgSentCnt.incrementAndGet();
             this.channel.writeAndFlush(encodeObject);
             this.msgInflightCnt.incrementAndGet();
+            return procResult.setSuccess();
         } catch (Throwable ex) {
             if (conExptCnt.shouldPrint()) {
                 logger.warn("NettyClient({}) write {} exception",
@@ -274,7 +275,6 @@ public class TcpNettyClient {
         } finally {
             this.rw.readLock().unlock();
         }
-        return procResult.setSuccess();
     }
 
     public void setFrozen(long termId) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index 3720a98ecb..99e0c38289 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -28,6 +28,7 @@ import 
org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.metric.MetricDataHolder;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,7 +86,8 @@ public abstract class BaseSender implements ConfigHolder {
         this.baseConfig = configure.clone();
         this.senderFactory = senderFactory;
         this.factoryClusterIdKey = clusterIdKey;
-        this.senderId = configure.getDataRptProtocol() + "-" + 
senderIdGen.incrementAndGet();
+        this.senderId = configure.getDataRptProtocol()
+                + "-" + ProxyUtils.getProcessPid() + "-" + 
senderIdGen.incrementAndGet();
         this.configManager = new ProxyConfigManager(this.senderId, 
this.baseConfig, this);
         this.configManager.setDaemon(true);
         this.metricHolder = new MetricDataHolder(this);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
index 1c6e0a53e6..b764e40b52 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
@@ -67,10 +67,10 @@ public class InLongHttpMsgSender extends BaseSender 
implements HttpMsgSender {
             return httpClientMgr.sendMessage(eventInfo, procResult);
         } finally {
             if (procResult.isSuccess()) {
-                metricHolder.addSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
+                metricHolder.addSyncSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
                         eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
             } else {
-                metricHolder.addFailMetric(procResult.getErrCode(),
+                metricHolder.addSyncFailMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
         }
@@ -92,8 +92,11 @@ public class InLongHttpMsgSender extends BaseSender 
implements HttpMsgSender {
             }
             return httpClientMgr.asyncSendMessage(new HttpAsyncObj(eventInfo, 
callback), procResult);
         } finally {
-            if (!procResult.isSuccess()) {
-                metricHolder.addFailMetric(procResult.getErrCode(),
+            if (procResult.isSuccess()) {
+                metricHolder.addAsyncHttpSucPutMetric(
+                        eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
+            } else {
+                metricHolder.addAsyncHttpFailPutMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
         }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
index 084cd3b8a7..8add9f15f3 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -84,10 +84,10 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
             return processEvent(SendQos.SOURCE_ACK, eventInfo, null, 
procResult);
         } finally {
             if (procResult.isSuccess()) {
-                metricHolder.addSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
+                metricHolder.addSyncSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
                         eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
             } else {
-                metricHolder.addFailMetric(procResult.getErrCode(),
+                metricHolder.addSyncFailMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
         }
@@ -100,15 +100,14 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
-        long curTime = System.currentTimeMillis();
         try {
             return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, 
procResult);
         } finally {
             if (procResult.isSuccess()) {
-                metricHolder.addSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
-                        eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
+                metricHolder.addAsyncSucReqMetric(
+                        eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             } else {
-                metricHolder.addFailMetric(procResult.getErrCode(),
+                metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
         }
@@ -125,10 +124,10 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
             return processEvent(SendQos.NO_ACK, eventInfo, null, procResult);
         } finally {
             if (procResult.isSuccess()) {
-                metricHolder.addSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
+                metricHolder.addSyncSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
                         eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
             } else {
-                metricHolder.addFailMetric(procResult.getErrCode(),
+                metricHolder.addSyncFailMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
         }
@@ -145,10 +144,10 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
             return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
         } finally {
             if (procResult.isSuccess()) {
-                metricHolder.addSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
+                metricHolder.addSyncSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
                         eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
             } else {
-                metricHolder.addFailMetric(procResult.getErrCode(),
+                metricHolder.addSyncFailMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
         }
@@ -161,15 +160,14 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
-        long curTime = System.currentTimeMillis();
         try {
             return processEvent(SendQos.SINK_ACK, eventInfo, callback, 
procResult);
         } finally {
             if (procResult.isSuccess()) {
-                metricHolder.addSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
-                        eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
+                metricHolder.addAsyncSucReqMetric(
+                        eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             } else {
-                metricHolder.addFailMetric(procResult.getErrCode(),
+                metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
         }


Reply via email to