This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 6888ba935 [INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013) 6888ba935 is described below commit 6888ba9351aa4ba4c6ae8c2388e9ca614a3d303e Author: Goson Zhang <4675...@qq.com> AuthorDate: Sun Sep 25 11:22:30 2022 +0800 [INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013) --- .../dataproxy/metrics/DataProxyMetricItemSet.java | 29 ++++++++-------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java index cb52823bb..1256c328a 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java @@ -80,17 +80,6 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> { fillMetricItemsByEvent(event, true, true, isSuccess, size, 0); } - /** - * Fill sink metric items by event - * - * @param event the event object - * @param isSuccess whether success read or send - * @param size the message size - */ - public void fillSinkReadMetricItemsByEvent(Event event, boolean isSuccess, long size) { - fillMetricItemsByEvent(event, false, true, isSuccess, size, 0); - } - /** * Fill sink send metric items by event * @@ -124,6 +113,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> { event.getHeaders().get(AttributeConstants.STREAM_ID)); long dataTime = NumberUtils.toLong( event.getHeaders().get(AttributeConstants.DATA_TIME)); + long msgCount = NumberUtils.toLong( + event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval(); dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime)); if (isSource) { @@ -137,16 +128,16 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> { DataProxyMetricItem metricItem = findMetricItem(dimensions); if (isReadOp) { if (isSuccess) { - metricItem.readSuccessCount.incrementAndGet(); + metricItem.readSuccessCount.addAndGet(msgCount); metricItem.readSuccessSize.addAndGet(size); } else { - metricItem.readFailCount.incrementAndGet(); + metricItem.readFailCount.addAndGet(msgCount); metricItem.readFailSize.addAndGet(size); } } else { if (isSuccess) { - metricItem.sendSuccessCount.incrementAndGet(); - metricItem.sendSuccessSize.addAndGet(event.getBody().length); + metricItem.sendSuccessCount.addAndGet(msgCount); + metricItem.sendSuccessSize.addAndGet(size); if (sendTime > 0) { long currentTime = System.currentTimeMillis(); long msgDataTimeL = Long.parseLong( @@ -158,11 +149,11 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> { metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL); } } else { - metricItem.sendFailCount.incrementAndGet(); - metricItem.sendFailSize.addAndGet(event.getBody().length); + metricItem.sendFailCount.addAndGet(msgCount); + metricItem.sendFailSize.addAndGet(size); } - metricItem.sendCount.incrementAndGet(); - metricItem.sendSize.addAndGet(event.getBody().length); + metricItem.sendCount.addAndGet(msgCount); + metricItem.sendSize.addAndGet(size); } }