This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 76e739ba3 [INLONG-5788][DataProxy] Fix the incorrect readFailSize metric value (#5792) 76e739ba3 is described below commit 76e739ba34036e08f24be23926e1eaa544d08f94 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Sep 6 15:10:40 2022 +0800 [INLONG-5788][DataProxy] Fix the incorrect readFailSize metric value (#5792) --- .../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 2 +- .../java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java | 4 ++-- .../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index f740f9b82..8127a4305 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -417,7 +417,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, "")); DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); metricItem.readSuccessCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + metricItem.readSuccessSize.addAndGet(event.getBody().length); } } else { status = Status.BACKOFF; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java index 521f3c24f..fd4788b09 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java @@ -596,8 +596,8 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, ""); } DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readFailCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + metricItem.readSuccessCount.incrementAndGet(); + metricItem.readSuccessSize.addAndGet(event.getBody().length); } } else { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index 3cf3e27ae..2d1374d37 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -285,7 +285,7 @@ public class TubeSink extends AbstractSink implements Configurable { cachedMsgCnt.incrementAndGet(); DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); metricItem.readSuccessCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + metricItem.readSuccessSize.addAndGet(event.getBody().length); } else { tx.rollback(); //logger.info("[{}] Channel --> Queue(has no enough space,current code point) "