This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 63c28583ff [INLONG-11743][SDK] Adjustment of metric statistics (#11744) 63c28583ff is described below commit 63c28583ffe568ae25d21e2e3a860ab44d5926f9 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Feb 11 14:09:13 2025 +0800 [INLONG-11743][SDK] Adjustment of metric statistics (#11744) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../inlong/sdk/dataproxy/metric/MetaSyncInfo.java | 4 +- .../sdk/dataproxy/metric/MetricDataHolder.java | 216 +++++++++++++++++---- .../inlong/sdk/dataproxy/metric/TimeCostInfo.java | 4 +- .../inlong/sdk/dataproxy/metric/TrafficInfo.java | 151 ++++++++++---- .../sdk/dataproxy/network/http/HttpClientMgr.java | 55 +++++- .../sdk/dataproxy/network/tcp/TcpClientMgr.java | 2 +- .../sdk/dataproxy/network/tcp/TcpNettyClient.java | 2 +- .../inlong/sdk/dataproxy/sender/BaseSender.java | 4 +- .../dataproxy/sender/http/InLongHttpMsgSender.java | 11 +- .../dataproxy/sender/tcp/InLongTcpMsgSender.java | 26 ++- 10 files changed, 367 insertions(+), 108 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java index 2539c7ae53..b574eff2ac 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java @@ -46,12 +46,12 @@ public class MetaSyncInfo { public void getAndResetValue(StringBuilder strBuff) { if (syncErrInfo.isEmpty()) { - strBuff.append("\"mSync\":{\"errT\":{},"); + strBuff.append("\"ms\":{\"errT\":{},"); syncCostMs.getAndResetValue(strBuff); strBuff.append("}"); } else { long curCnt = 0; - strBuff.append("\"mSync\":{\"errT\":{"); + strBuff.append("\"ms\":{\"errT\":{"); for (Map.Entry<Integer, LongAdder> entry : syncErrInfo.entrySet()) { if (curCnt++ > 0) { strBuff.append(","); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java index 9b8e19560c..4b16f62c8d 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java @@ -40,7 +40,7 @@ public class MetricDataHolder implements Runnable { private static final String DEFAULT_KEY_SPLITTER = "#"; private static final Logger logger = LoggerFactory.getLogger(MetricDataHolder.class); - private static final LogCounter exceptCnt = new LogCounter(10, 100000, 60 * 1000L); + private static final LogCounter exceptCnt = new LogCounter(5, 100000, 60 * 1000L); private final MetricConfig metricConfig; private final BaseSender sender; @@ -95,7 +95,7 @@ public class MetricDataHolder implements Runnable { } public void addMetaSyncMetric(int errCode, long syncCostMs) { - if (!this.started || !this.metricConfig.isEnableMetric()) { + if (!this.metricConfig.isEnableMetric()) { return; } MetricInfoUnit selectedUnit = metricUnits[itemIndex]; @@ -107,28 +107,56 @@ public class MetricDataHolder implements Runnable { } } - public void addSucMetric(String groupId, String streamId, int msgCnt, long costMs) { - if (!this.started || !this.metricConfig.isEnableMetric()) { + public void addSyncSucMetric(String groupId, String streamId, int msgCnt, long costMs) { + if (!this.metricConfig.isEnableMetric()) { return; } MetricInfoUnit selectedUnit = metricUnits[itemIndex]; selectedUnit.refCnt.incrementAndGet(); try { - selectedUnit.addSucMsgInfo(groupId, + selectedUnit.addSyncSendSucInfo(groupId, (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, costMs); } finally { selectedUnit.refCnt.decrementAndGet(); } } - public void addFailMetric(int errCode, String groupId, String streamId, int msgCnt) { - if (!this.started || !this.metricConfig.isEnableMetric()) { + public void addSyncFailMetric(int errCode, String groupId, String streamId, int msgCnt) { + if (!this.metricConfig.isEnableMetric()) { return; } MetricInfoUnit selectedUnit = metricUnits[itemIndex]; selectedUnit.refCnt.incrementAndGet(); try { - selectedUnit.addFailMsgInfo(groupId, + selectedUnit.addSyncSendFailInfo(groupId, + (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, errCode); + } finally { + selectedUnit.refCnt.decrementAndGet(); + } + } + + public void addAsyncSucReqMetric(String groupId, String streamId, int msgCnt) { + if (!this.metricConfig.isEnableMetric()) { + return; + } + MetricInfoUnit selectedUnit = metricUnits[itemIndex]; + selectedUnit.refCnt.incrementAndGet(); + try { + selectedUnit.addAsyncSendSucInfo(groupId, + (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt); + } finally { + selectedUnit.refCnt.decrementAndGet(); + } + } + + public void addAsyncFailReqMetric(int errCode, String groupId, String streamId, int msgCnt) { + if (!this.metricConfig.isEnableMetric()) { + return; + } + MetricInfoUnit selectedUnit = metricUnits[itemIndex]; + selectedUnit.refCnt.incrementAndGet(); + try { + selectedUnit.addAsyncSendFailInfo(groupId, (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, errCode); } finally { selectedUnit.refCnt.decrementAndGet(); @@ -136,13 +164,13 @@ public class MetricDataHolder implements Runnable { } public void addCallbackSucMetric(String groupId, String streamId, int msgCnt, long costMs, long callDurMs) { - if (!this.started || !this.metricConfig.isEnableMetric()) { + if (!this.metricConfig.isEnableMetric()) { return; } MetricInfoUnit selectedUnit = metricUnits[itemIndex]; selectedUnit.refCnt.incrementAndGet(); try { - selectedUnit.addSucMsgInfo(groupId, + selectedUnit.addAsyncRspSucInfo(groupId, (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, costMs, callDurMs); } finally { selectedUnit.refCnt.decrementAndGet(); @@ -150,19 +178,61 @@ public class MetricDataHolder implements Runnable { } public void addCallbackFailMetric(int errCode, String groupId, String streamId, int msgCnt, long costMs) { - if (!this.started || !this.metricConfig.isEnableMetric()) { + if (!this.metricConfig.isEnableMetric()) { return; } MetricInfoUnit selectedUnit = metricUnits[itemIndex]; selectedUnit.refCnt.incrementAndGet(); try { - selectedUnit.addFailMsgInfo(groupId, + selectedUnit.addAsyncRspFailInfo(groupId, (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, errCode, costMs); } finally { selectedUnit.refCnt.decrementAndGet(); } } + public void addAsyncHttpSucPutMetric(String groupId, String streamId, int msgCnt) { + if (!this.metricConfig.isEnableMetric()) { + return; + } + MetricInfoUnit selectedUnit = metricUnits[itemIndex]; + selectedUnit.refCnt.incrementAndGet(); + try { + selectedUnit.addAsyncHttpPutSucInfo(groupId, + (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt); + } finally { + selectedUnit.refCnt.decrementAndGet(); + } + } + + public void addAsyncHttpFailPutMetric(int errCode, String groupId, String streamId, int msgCnt) { + if (!this.metricConfig.isEnableMetric()) { + return; + } + MetricInfoUnit selectedUnit = metricUnits[itemIndex]; + selectedUnit.refCnt.incrementAndGet(); + try { + selectedUnit.addAsyncHttpPutFailInfo(groupId, + (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt, errCode); + } finally { + selectedUnit.refCnt.decrementAndGet(); + } + } + + public void addAsyncHttpSucGetMetric(String groupId, String streamId, int msgCnt) { + if (!this.metricConfig.isEnableMetric()) { + return; + } + MetricInfoUnit selectedUnit = metricUnits[itemIndex]; + selectedUnit.refCnt.incrementAndGet(); + try { + selectedUnit.addAsyncHttpGetSucInfo(groupId, + (this.metricConfig.isMaskStreamId() ? "" : streamId), msgCnt); + } finally { + selectedUnit.refCnt.decrementAndGet(); + } + } + private void outputMetricData(boolean forceOutput, long reportTime, int readIndex) { if (!this.metricConfig.isEnableMetric()) { return; @@ -180,11 +250,7 @@ public class MetricDataHolder implements Runnable { || (System.currentTimeMillis() - startTime >= 5000L)) { break; } - try { - Thread.sleep(3); - } catch (InterruptedException e) { - break; - } + ProxyUtils.sleepSomeTime(80); } while (selectedUnit.refCnt.get() > 0); if (!forceOutput && !this.started) { logger.info("Metric DataHolder({}) closed, stop output metric info", @@ -218,7 +284,47 @@ public class MetricDataHolder implements Runnable { protected final ConcurrentHashMap<String, TrafficInfo> trafficMap = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<Integer, LongAdder> errCodeMap = new ConcurrentHashMap<>(); - public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long costMs) { + public void addSyncSendSucInfo(String groupId, String streamId, int msgCnt, long costMs) { + String recordKey = getKeyStringByConfig(groupId, streamId); + TrafficInfo trafficInfo = this.trafficMap.get(recordKey); + if (trafficInfo == null) { + TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId); + trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo); + if (trafficInfo == null) { + trafficInfo = tmpInfo; + } + } + trafficInfo.addSyncSucMsgInfo(msgCnt, costMs); + } + + public void addSyncSendFailInfo(String groupId, String streamId, int msgCnt, int errCode) { + String recordKey = getKeyStringByConfig(groupId, streamId); + TrafficInfo trafficInfo = this.trafficMap.get(recordKey); + if (trafficInfo == null) { + TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId); + trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo); + if (trafficInfo == null) { + trafficInfo = tmpInfo; + } + } + trafficInfo.addSyncFailMsgInfo(msgCnt); + addSendErrCodeInfo(errCode); + } + + public void addAsyncSendSucInfo(String groupId, String streamId, int msgCnt) { + String recordKey = getKeyStringByConfig(groupId, streamId); + TrafficInfo trafficInfo = this.trafficMap.get(recordKey); + if (trafficInfo == null) { + TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId); + trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo); + if (trafficInfo == null) { + trafficInfo = tmpInfo; + } + } + trafficInfo.addAsyncSucSendInfo(msgCnt); + } + + public void addAsyncSendFailInfo(String groupId, String streamId, int msgCnt, int errCode) { String recordKey = getKeyStringByConfig(groupId, streamId); TrafficInfo trafficInfo = this.trafficMap.get(recordKey); if (trafficInfo == null) { @@ -228,10 +334,11 @@ public class MetricDataHolder implements Runnable { trafficInfo = tmpInfo; } } - trafficInfo.addSucMsgInfo(msgCnt, costMs); + trafficInfo.addAsyncFailSendInfo(msgCnt); + addSendErrCodeInfo(errCode); } - public void addSucMsgInfo(String groupId, String streamId, int msgCnt, long sdCostMs, long cbCostMs) { + public void addAsyncHttpPutSucInfo(String groupId, String streamId, int msgCnt) { String recordKey = getKeyStringByConfig(groupId, streamId); TrafficInfo trafficInfo = this.trafficMap.get(recordKey); if (trafficInfo == null) { @@ -241,10 +348,10 @@ public class MetricDataHolder implements Runnable { trafficInfo = tmpInfo; } } - trafficInfo.addSucMsgInfo(msgCnt, sdCostMs, cbCostMs); + trafficInfo.addAsyncHttpSucPutInfo(msgCnt); } - public void addFailMsgInfo(String groupId, String streamId, int msgCnt, int errCode) { + public void addAsyncHttpPutFailInfo(String groupId, String streamId, int msgCnt, int errCode) { String recordKey = getKeyStringByConfig(groupId, streamId); TrafficInfo trafficInfo = this.trafficMap.get(recordKey); if (trafficInfo == null) { @@ -254,12 +361,37 @@ public class MetricDataHolder implements Runnable { trafficInfo = tmpInfo; } } - trafficInfo.addFailMsgInfo(msgCnt); + trafficInfo.addAsyncHttpFailPutInfo(msgCnt); addSendErrCodeInfo(errCode); } - public void addFailMsgInfo(String groupId, String streamId, - int msgCnt, int errCode, long cbCostMs) { + public void addAsyncHttpGetSucInfo(String groupId, String streamId, int msgCnt) { + String recordKey = getKeyStringByConfig(groupId, streamId); + TrafficInfo trafficInfo = this.trafficMap.get(recordKey); + if (trafficInfo == null) { + TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId); + trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo); + if (trafficInfo == null) { + trafficInfo = tmpInfo; + } + } + trafficInfo.addAsyncHttpSucGetInfo(msgCnt); + } + + public void addAsyncRspSucInfo(String groupId, String streamId, int msgCnt, long sdCostMs, long cbCostMs) { + String recordKey = getKeyStringByConfig(groupId, streamId); + TrafficInfo trafficInfo = this.trafficMap.get(recordKey); + if (trafficInfo == null) { + TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId); + trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo); + if (trafficInfo == null) { + trafficInfo = tmpInfo; + } + } + trafficInfo.addAsyncSucRspInfo(msgCnt, sdCostMs, cbCostMs); + } + + public void addAsyncRspFailInfo(String groupId, String streamId, int msgCnt, int errCode, long cbCostMs) { String recordKey = getKeyStringByConfig(groupId, streamId); TrafficInfo trafficInfo = this.trafficMap.get(recordKey); if (trafficInfo == null) { @@ -269,11 +401,11 @@ public class MetricDataHolder implements Runnable { trafficInfo = tmpInfo; } } - trafficInfo.addFailMsgInfo(msgCnt, cbCostMs); + trafficInfo.addAsyncFailRspInfo(msgCnt, cbCostMs); addSendErrCodeInfo(errCode); } - public void addSendErrCodeInfo(int errCode) { + private void addSendErrCodeInfo(int errCode) { LongAdder longCount = this.errCodeMap.get(errCode); if (longCount == null) { LongAdder tmpCount = new LongAdder(); @@ -288,7 +420,7 @@ public class MetricDataHolder implements Runnable { public void getAndResetValue(StringBuilder strBuff) { int count = 0; metaSyncInfo.getAndResetValue(strBuff); - strBuff.append(",\"m\":["); + strBuff.append(",\"tr\":["); for (Map.Entry<String, TrafficInfo> entry : trafficMap.entrySet()) { if (count++ > 0) { strBuff.append(","); @@ -325,9 +457,9 @@ public class MetricDataHolder implements Runnable { .append(",\"lrT\":").append(lstReportTime) .append(","); metricUnit.getAndResetValue(strBuff); - strBuff.append(",\"s\":{\"tNodes\":").append(sender.getProxyNodeCnt()) - .append(",\"aNodes\":").append(sender.getActiveNodeCnt()) - .append(",\"ifReqs\":").append(sender.getInflightMsgCnt()) + strBuff.append(",\"s\":{\"tNs\":").append(sender.getProxyNodeCnt()) + .append(",\"aNs\":").append(sender.getActiveNodeCnt()) + .append(",\"ifRs\":").append(sender.getInflightMsgCnt()) .append("},\"c\":{\"aC\":").append(sender.getConfigure().getAliveConnections()) .append(",\"rP\":\"").append(sender.getConfigure().getDataRptProtocol()) .append("\",\"rG\":\"").append(sender.getConfigure().getRegionName()) @@ -335,22 +467,22 @@ public class MetricDataHolder implements Runnable { if (sender instanceof TcpMsgSender) { TcpMsgSenderConfig tcpConfig = (TcpMsgSenderConfig) sender.getConfigure(); strBuff.append(",\"mT\":").append(tcpConfig.getSdkMsgType().getValue()) - .append(",\"comp\":").append(tcpConfig.isEnableDataCompress()) - .append(",\"mCLen\":").append(tcpConfig.getMinCompEnableLength()) - .append(",\"lfSep\":").append(tcpConfig.isSeparateEventByLF()) + .append(",\"cp\":").append(tcpConfig.isEnableDataCompress()) + .append(",\"mCp\":").append(tcpConfig.getMinCompEnableLength()) + .append(",\"lf\":").append(tcpConfig.isSeparateEventByLF()) .append(",\"nWk\":").append(tcpConfig.getNettyWorkerThreadNum()) .append(",\"sB\":").append(tcpConfig.getSendBufferSize()) .append(",\"rB\":").append(tcpConfig.getRcvBufferSize()) - .append(",\"cOut\":").append(tcpConfig.getConnectTimeoutMs()) - .append(",\"rOut\":").append(tcpConfig.getRequestTimeoutMs()) - .append(",\"syncOut\":").append(tcpConfig.getMaxAllowedSyncMsgTimeoutCnt()); + .append(",\"cOt\":").append(tcpConfig.getConnectTimeoutMs()) + .append(",\"rOt\":").append(tcpConfig.getRequestTimeoutMs()) + .append(",\"syOt\":").append(tcpConfig.getMaxAllowedSyncMsgTimeoutCnt()); } else { HttpMsgSenderConfig httpConfig = (HttpMsgSenderConfig) sender.getConfigure(); - strBuff.append(",\"iHttps\":").append(httpConfig.isRptDataByHttps()) - .append(",\"sOut\":").append(httpConfig.getHttpSocketTimeoutMs()) - .append(",\"cOut\":").append(httpConfig.getHttpConTimeoutMs()) - .append(",\"asyWk\":").append(httpConfig.getHttpAsyncRptWorkerNum()) - .append(",\"asyCh\":").append(httpConfig.getHttpAsyncRptCacheSize()); + strBuff.append(",\"iHs\":").append(httpConfig.isRptDataByHttps()) + .append(",\"sOt\":").append(httpConfig.getHttpSocketTimeoutMs()) + .append(",\"cOt\":").append(httpConfig.getHttpConTimeoutMs()) + .append(",\"aWk\":").append(httpConfig.getHttpAsyncRptWorkerNum()) + .append(",\"aC\":").append(httpConfig.getHttpAsyncRptCacheSize()); } String content = strBuff.append("}}").toString(); strBuff.delete(0, strBuff.length()); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java index 1bd33306a1..5a5ad7deb5 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java @@ -73,10 +73,10 @@ public class TimeCostInfo { long curTotalCnt = totalCnt.sumThenReset(); if (curTotalCnt == 0) { strBuff.append("\"").append(name) - .append("\":{\"bucketT\":{},\"min\":0,\"max\":0,\"avgT\":0}"); + .append("\":{\"bkts\":{},\"min\":0,\"max\":0,\"avgT\":0}"); } else { long bucketCnt = 0; - strBuff.append("\"").append(name).append("\":{\"bucketT\":{"); + strBuff.append("\"").append(name).append("\":{\"bkts\":{"); for (Map.Entry<String, LongAdder> entry : sendTimeBucketT.entrySet()) { if (bucketCnt++ > 0) { strBuff.append(","); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java index 2622c47cef..759148f4ff 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java @@ -27,16 +27,44 @@ public class TrafficInfo { private final String groupId; // sId private final String streamId; - // sPkg - private final LongAdder sendPkgCount = new LongAdder(); - // sMsg - private final LongAdder sendMsgCount = new LongAdder(); - // fMsg - private final LongAdder failedPgkCount = new LongAdder(); - // fMsg - private final LongAdder failedMsgCount = new LongAdder(); - // sucMs: send success time cost in Ms - private final TimeCostInfo sendCostMs = new TimeCostInfo("sucMs"); + + // sPs + private final LongAdder syncSendPkgCnt = new LongAdder(); + // sMs + private final LongAdder syncSendMsgCount = new LongAdder(); + // sPf + private final LongAdder syncFailedPgkCount = new LongAdder(); + // sucMs: sync call time cost in Ms + private final TimeCostInfo syncSendCostMs = new TimeCostInfo("sSucMs"); + // http + // apPs + private final LongAdder aSyncHttpPkgPutCnt = new LongAdder(); + // apMs + private final LongAdder aSyncHttpMsgPutCnt = new LongAdder(); + // apPf + private final LongAdder aSyncHttpFailPkgPutCnt = new LongAdder(); + // agPs + private final LongAdder aSyncHttpPkgGetCnt = new LongAdder(); + // agMs + private final LongAdder aSyncHttpMsgGetCnt = new LongAdder(); + // aPs + private final LongAdder aSyncSendPkgCount = new LongAdder(); + // aMs + private final LongAdder aSyncSendMsgCount = new LongAdder(); + // aPf + private final LongAdder aSyncFailedPgkCnt = new LongAdder(); + // arPs + private final LongAdder aRcvPkgCount = new LongAdder(); + // arMs + private final LongAdder aRcvMsgCount = new LongAdder(); + // arPf + private final LongAdder aRcvFailedPgkCount = new LongAdder(); + // arMf + private final LongAdder aRcvFailedMsgCount = new LongAdder(); + // sucMs: async received time cost in Ms + private final TimeCostInfo asyncSucCostMs = new TimeCostInfo("aSucMs"); + // call back call count + private final LongAdder cbCallCount = new LongAdder(); // cbMs: call back time cost in Ms private final TimeCostInfo callbackCostMs = new TimeCostInfo("cbMs"); @@ -45,27 +73,51 @@ public class TrafficInfo { this.streamId = streamId; } - public void addSucMsgInfo(int msgCnt, long costMs) { - sendPkgCount.add(1); - sendMsgCount.add(msgCnt); - sendCostMs.addTimeCostInMs(costMs); + public void addSyncSucMsgInfo(int msgCnt, long costMs) { + syncSendPkgCnt.increment(); + syncSendMsgCount.add(msgCnt); + syncSendCostMs.addTimeCostInMs(costMs); } - public void addSucMsgInfo(int msgCnt, long sdCostMs, long cbCostMs) { - sendPkgCount.add(1); - sendMsgCount.add(msgCnt); - sendCostMs.addTimeCostInMs(sdCostMs); - callbackCostMs.addTimeCostInMs(cbCostMs); + public void addSyncFailMsgInfo(int msgCnt) { + syncFailedPgkCount.increment(); + } + + public void addAsyncSucSendInfo(int msgCnt) { + aSyncSendPkgCount.increment(); + aSyncSendMsgCount.add(msgCnt); } - public void addFailMsgInfo(int msgCnt) { - failedPgkCount.add(1); - failedMsgCount.add(msgCnt); + public void addAsyncFailSendInfo(int msgCnt) { + aSyncFailedPgkCnt.increment(); + } + + public void addAsyncHttpSucPutInfo(int msgCnt) { + aSyncHttpPkgPutCnt.increment(); + aSyncHttpMsgPutCnt.add(msgCnt); + } + + public void addAsyncHttpFailPutInfo(int msgCnt) { + aSyncHttpFailPkgPutCnt.increment(); + } + + public void addAsyncHttpSucGetInfo(int msgCnt) { + aSyncHttpPkgGetCnt.increment(); + aSyncHttpMsgGetCnt.add(msgCnt); + } + + public void addAsyncSucRspInfo(int msgCnt, long sdCostMs, long cbCostMs) { + aRcvPkgCount.increment(); + aRcvMsgCount.add(msgCnt); + asyncSucCostMs.addTimeCostInMs(sdCostMs); + cbCallCount.increment(); + callbackCostMs.addTimeCostInMs(cbCostMs); } - public void addFailMsgInfo(int msgCnt, long cbCostMs) { - failedPgkCount.add(1); - failedMsgCount.add(msgCnt); + public void addAsyncFailRspInfo(int msgCnt, long cbCostMs) { + aRcvFailedPgkCount.increment(); + aRcvFailedMsgCount.add(msgCnt); + cbCallCount.increment(); callbackCostMs.addTimeCostInMs(cbCostMs); } @@ -74,23 +126,50 @@ public class TrafficInfo { if (StringUtils.isNotBlank(this.streamId)) { strBuff.append("\",\"sId\":\"").append(this.streamId); } - strBuff.append("\",\"sPkg\":").append(sendPkgCount.sumThenReset()) - .append(",\"sMsg\":").append(sendMsgCount.sumThenReset()) - .append(",\"fPkg\":").append(failedPgkCount.sumThenReset()) - .append(",\"fMsg\":").append(failedMsgCount.sumThenReset()) - .append(","); - this.sendCostMs.getAndResetValue(strBuff); + strBuff.append("\",\"sPs\":").append(syncSendPkgCnt.sumThenReset()) + .append(",\"sPf\":").append(syncFailedPgkCount.sumThenReset()) + .append(",\"sMs\":").append(syncSendMsgCount.sumThenReset()).append(","); + this.syncSendCostMs.getAndResetValue(strBuff); + strBuff.append(",\"apPs\":").append(aSyncHttpPkgPutCnt.sumThenReset()) + .append(",\"apPf\":").append(aSyncHttpFailPkgPutCnt.sumThenReset()) + .append(",\"agPs\":").append(aSyncHttpPkgGetCnt.sumThenReset()) + .append(",\"aPs\":").append(aSyncSendPkgCount.sumThenReset()) + .append(",\"aPf\":").append(aSyncFailedPgkCnt.sumThenReset()) + .append(",\"arPs\":").append(aRcvPkgCount.sumThenReset()) + .append(",\"arPf\":").append(aRcvFailedPgkCount.sumThenReset()) + .append(",\"cbCt\":").append(cbCallCount.sumThenReset()) + .append(",\"apMs\":").append(aSyncHttpMsgPutCnt.sumThenReset()) + .append(",\"agMs\":").append(aSyncHttpMsgGetCnt.sumThenReset()) + .append(",\"aMs\":").append(aSyncSendMsgCount.sumThenReset()) + .append(",\"arMs\":").append(aRcvMsgCount.sumThenReset()) + .append(",\"arMf\":").append(aRcvFailedMsgCount.sumThenReset()).append(","); + this.asyncSucCostMs.getAndResetValue(strBuff); strBuff.append(","); this.callbackCostMs.getAndResetValue(strBuff); strBuff.append("}"); } public void clear() { - this.sendPkgCount.reset(); - this.sendMsgCount.reset(); - this.failedPgkCount.reset(); - this.failedMsgCount.reset(); - this.sendCostMs.clear(); + this.syncSendPkgCnt.reset(); + this.syncSendMsgCount.reset(); + this.syncFailedPgkCount.reset(); + this.syncSendCostMs.clear(); + // + this.aSyncSendPkgCount.reset(); + this.aSyncSendMsgCount.reset(); + this.aSyncFailedPgkCnt.reset(); + this.aSyncHttpPkgPutCnt.reset(); + this.aSyncHttpMsgPutCnt.reset(); + this.aSyncHttpFailPkgPutCnt.reset(); + this.aSyncHttpPkgGetCnt.reset(); + this.aSyncHttpMsgGetCnt.reset(); + // + this.aRcvPkgCount.reset(); + this.aRcvMsgCount.reset(); + this.aRcvFailedPgkCount.reset(); + this.aRcvFailedMsgCount.reset(); + this.asyncSucCostMs.clear(); + this.cbCallCount.reset(); this.callbackCostMs.clear(); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java index bc4d2884c3..1700e394d5 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java @@ -80,6 +80,7 @@ public class HttpClientMgr implements ClientMgr { private final LinkedBlockingQueue<HttpAsyncObj> messageCache; private final Semaphore asyncIdleCellCnt; private final ExecutorService workerServices = Executors.newCachedThreadPool(); + private volatile boolean existSend = false; private final AtomicBoolean shutDown = new AtomicBoolean(false); // meta info private ConcurrentHashMap<String, HostInfo> usingNodeMaps = new ConcurrentHashMap<>(); @@ -126,9 +127,8 @@ public class HttpClientMgr implements ClientMgr { long stopTime = System.currentTimeMillis(); logger.info("ClientMgr({}) is closing...", this.sender.getSenderId()); if (!messageCache.isEmpty()) { - if (httpConfig.isDiscardHttpCacheWhenClosing()) { - messageCache.clear(); - } else { + if (!httpConfig.isDiscardHttpCacheWhenClosing()) { + // wait last event report long startTime = System.currentTimeMillis(); while (!messageCache.isEmpty()) { if (System.currentTimeMillis() - startTime >= httpConfig.getHttpCloseWaitPeriodMs()) { @@ -136,8 +136,43 @@ public class HttpClientMgr implements ClientMgr { } ProxyUtils.sleepSomeTime(100L); } - remainCnt = messageCache.size(); - messageCache.clear(); + } + // force exist report + existSend = true; + // call back result + boolean isSucc; + long currentTime; + HttpAsyncObj asyncObj; + while (!messageCache.isEmpty()) { + asyncObj = messageCache.poll(); + if (asyncObj == null) { + continue; + } + isSucc = true; + currentTime = System.currentTimeMillis(); + sender.getMetricHolder().addAsyncHttpSucGetMetric( + asyncObj.getHttpEvent().getGroupId(), + asyncObj.getHttpEvent().getStreamId(), + asyncObj.getHttpEvent().getMsgCnt()); + try { + asyncObj.getCallback().onMessageAck(new ProcessResult(ErrorCode.SDK_CLOSED)); + } catch (Throwable ex) { + isSucc = false; + if (asyncSendExptCnt.shouldPrint()) { + logger.error("HttpAsync({}) callback event exception", this.sender.getSenderId(), ex); + } + } finally { + asyncIdleCellCnt.release(); + if (isSucc) { + sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(), + asyncObj.getHttpEvent().getStreamId(), asyncObj.getHttpEvent().getMsgCnt(), + (currentTime - asyncObj.getRptMs()), (System.currentTimeMillis() - currentTime)); + } else { + sender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(), + asyncObj.getHttpEvent().getGroupId(), asyncObj.getHttpEvent().getStreamId(), + asyncObj.getHttpEvent().getMsgCnt(), (System.currentTimeMillis() - currentTime)); + } + } } } workerServices.shutdown(); @@ -443,10 +478,17 @@ public class HttpClientMgr implements ClientMgr { // if not shutdown or queue is not empty while (!shutDown.get() || !messageCache.isEmpty()) { while (!messageCache.isEmpty()) { + if (existSend) { + break; + } asyncObj = messageCache.poll(); if (asyncObj == null) { continue; } + sender.getMetricHolder().addAsyncHttpSucGetMetric( + asyncObj.getHttpEvent().getGroupId(), + asyncObj.getHttpEvent().getStreamId(), + asyncObj.getHttpEvent().getMsgCnt()); try { sendMessage(asyncObj.getHttpEvent(), procResult); curTime = System.currentTimeMillis(); @@ -468,6 +510,9 @@ public class HttpClientMgr implements ClientMgr { } } } + if (existSend) { + break; + } ProxyUtils.sleepSomeTime(httpConfig.getHttpAsyncWorkerIdleWaitMs()); } logger.info("HttpAsyncReportWorker({}) stopped", this.workerId); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java index ec048f54ab..0a9c74b616 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java @@ -281,7 +281,7 @@ public class TcpClientMgr implements ClientMgr { } rmvMsgStubInfo(encObject.getMessageId()); } - return procResult.setSuccess(); + return procResult.isSuccess(); } else { // process sync report if (!client.write(clientTerm, encObject, procResult)) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java index 29899e6f58..c521b04cd1 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java @@ -265,6 +265,7 @@ public class TcpNettyClient { this.msgSentCnt.incrementAndGet(); this.channel.writeAndFlush(encodeObject); this.msgInflightCnt.incrementAndGet(); + return procResult.setSuccess(); } catch (Throwable ex) { if (conExptCnt.shouldPrint()) { logger.warn("NettyClient({}) write {} exception", @@ -274,7 +275,6 @@ public class TcpNettyClient { } finally { this.rw.readLock().unlock(); } - return procResult.setSuccess(); } public void setFrozen(long termId) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java index 3720a98ecb..99e0c38289 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java @@ -28,6 +28,7 @@ import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; import org.apache.inlong.sdk.dataproxy.metric.MetricDataHolder; import org.apache.inlong.sdk.dataproxy.network.ClientMgr; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +86,8 @@ public abstract class BaseSender implements ConfigHolder { this.baseConfig = configure.clone(); this.senderFactory = senderFactory; this.factoryClusterIdKey = clusterIdKey; - this.senderId = configure.getDataRptProtocol() + "-" + senderIdGen.incrementAndGet(); + this.senderId = configure.getDataRptProtocol() + + "-" + ProxyUtils.getProcessPid() + "-" + senderIdGen.incrementAndGet(); this.configManager = new ProxyConfigManager(this.senderId, this.baseConfig, this); this.configManager.setDaemon(true); this.metricHolder = new MetricDataHolder(this); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java index 1c6e0a53e6..b764e40b52 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java @@ -67,10 +67,10 @@ public class InLongHttpMsgSender extends BaseSender implements HttpMsgSender { return httpClientMgr.sendMessage(eventInfo, procResult); } finally { if (procResult.isSuccess()) { - metricHolder.addSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), + metricHolder.addSyncSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt(), (System.currentTimeMillis() - curTime)); } else { - metricHolder.addFailMetric(procResult.getErrCode(), + metricHolder.addSyncFailMetric(procResult.getErrCode(), eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } } @@ -92,8 +92,11 @@ public class InLongHttpMsgSender extends BaseSender implements HttpMsgSender { } return httpClientMgr.asyncSendMessage(new HttpAsyncObj(eventInfo, callback), procResult); } finally { - if (!procResult.isSuccess()) { - metricHolder.addFailMetric(procResult.getErrCode(), + if (procResult.isSuccess()) { + metricHolder.addAsyncHttpSucPutMetric( + eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); + } else { + metricHolder.addAsyncHttpFailPutMetric(procResult.getErrCode(), eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java index 084cd3b8a7..8add9f15f3 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java @@ -84,10 +84,10 @@ public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender { return processEvent(SendQos.SOURCE_ACK, eventInfo, null, procResult); } finally { if (procResult.isSuccess()) { - metricHolder.addSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), + metricHolder.addSyncSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt(), (System.currentTimeMillis() - curTime)); } else { - metricHolder.addFailMetric(procResult.getErrCode(), + metricHolder.addSyncFailMetric(procResult.getErrCode(), eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } } @@ -100,15 +100,14 @@ public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender { if (!this.isStarted()) { return procResult.setFailResult(ErrorCode.SDK_CLOSED); } - long curTime = System.currentTimeMillis(); try { return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, procResult); } finally { if (procResult.isSuccess()) { - metricHolder.addSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), - eventInfo.getMsgCnt(), (System.currentTimeMillis() - curTime)); + metricHolder.addAsyncSucReqMetric( + eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } else { - metricHolder.addFailMetric(procResult.getErrCode(), + metricHolder.addAsyncFailReqMetric(procResult.getErrCode(), eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } } @@ -125,10 +124,10 @@ public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender { return processEvent(SendQos.NO_ACK, eventInfo, null, procResult); } finally { if (procResult.isSuccess()) { - metricHolder.addSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), + metricHolder.addSyncSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt(), (System.currentTimeMillis() - curTime)); } else { - metricHolder.addFailMetric(procResult.getErrCode(), + metricHolder.addSyncFailMetric(procResult.getErrCode(), eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } } @@ -145,10 +144,10 @@ public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender { return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult); } finally { if (procResult.isSuccess()) { - metricHolder.addSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), + metricHolder.addSyncSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt(), (System.currentTimeMillis() - curTime)); } else { - metricHolder.addFailMetric(procResult.getErrCode(), + metricHolder.addSyncFailMetric(procResult.getErrCode(), eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } } @@ -161,15 +160,14 @@ public class InLongTcpMsgSender extends BaseSender implements TcpMsgSender { if (!this.isStarted()) { return procResult.setFailResult(ErrorCode.SDK_CLOSED); } - long curTime = System.currentTimeMillis(); try { return processEvent(SendQos.SINK_ACK, eventInfo, callback, procResult); } finally { if (procResult.isSuccess()) { - metricHolder.addSucMetric(eventInfo.getGroupId(), eventInfo.getStreamId(), - eventInfo.getMsgCnt(), (System.currentTimeMillis() - curTime)); + metricHolder.addAsyncSucReqMetric( + eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } else { - metricHolder.addFailMetric(procResult.getErrCode(), + metricHolder.addAsyncFailReqMetric(procResult.getErrCode(), eventInfo.getGroupId(), eventInfo.getStreamId(), eventInfo.getMsgCnt()); } }