This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3fbef9072 [INLONG-5788][DataProxy] Fix the incorrect readFailSize metric value (#5792) 3fbef9072 is described below commit 3fbef9072b1383f48022d3cabb3484f315d8c3da Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Sep 6 15:10:40 2022 +0800 [INLONG-5788][DataProxy] Fix the incorrect readFailSize metric value (#5792) --- .../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 2 +- .../java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java | 4 ++-- .../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index 7ba7724e8..71aceb9c5 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -418,7 +418,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(TOPIC, "")); DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); metricItem.readSuccessCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + metricItem.readSuccessSize.addAndGet(event.getBody().length); } } else { status = Status.BACKOFF; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java index def513ce5..a0e996029 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java @@ -597,8 +597,8 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, ""); } DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readFailCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + metricItem.readSuccessCount.incrementAndGet(); + metricItem.readSuccessSize.addAndGet(event.getBody().length); } } else { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index 74bdf719f..0f96aa0b9 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -287,7 +287,7 @@ public class TubeSink extends AbstractSink implements Configurable { cachedMsgCnt.incrementAndGet(); DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); metricItem.readSuccessCount.incrementAndGet(); - metricItem.readFailSize.addAndGet(event.getBody().length); + metricItem.readSuccessSize.addAndGet(event.getBody().length); } else { tx.rollback(); //logger.info("[{}] Channel --> Queue(has no enough space,current code point) "