This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c2c2c3069 [INLONG-6005][DataProxy] Fix resend read values (#6006) c2c2c3069 is described below commit c2c2c3069d325c4cf9040c9a96452757d6a0b93b Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com> AuthorDate: Fri Sep 23 18:43:51 2022 +0800 [INLONG-6005][DataProxy] Fix resend read values (#6006) --- .../org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java | 2 ++ .../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 6 ------ .../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java | 4 ---- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java index e420e03a2..cb52823bb 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java @@ -161,6 +161,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> { metricItem.sendFailCount.incrementAndGet(); metricItem.sendFailSize.addAndGet(event.getBody().length); } + metricItem.sendCount.incrementAndGet(); + metricItem.sendSize.addAndGet(event.getBody().length); } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index 3785ae993..cac638ce5 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -402,14 +402,8 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag + "--> pulsar,Check if pulsar server or network is ok.(if this situation " + "last long time it will cause memoryChannel full and fileChannel write.)", getName()); tx.rollback(); - // metric - this.metricItemSet.fillSinkReadMetricItemsByEvent( - event, false, event.getBody().length); } else { tx.commit(); - // metric - this.metricItemSet.fillSinkReadMetricItemsByEvent( - event, true, event.getBody().length); } } else { status = Status.BACKOFF; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index 86787136a..feb7fc390 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -273,12 +273,8 @@ public class TubeSink extends AbstractSink implements Configurable { if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) { tx.commit(); cachedMsgCnt.incrementAndGet(); - this.metricItemSet.fillSinkReadMetricItemsByEvent( - event, true, event.getBody().length); } else { tx.rollback(); - this.metricItemSet.fillSinkReadMetricItemsByEvent( - event, false, event.getBody().length); } } else { status = Status.BACKOFF;