This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c2c2c3069 [INLONG-6005][DataProxy] Fix resend read values (#6006)
c2c2c3069 is described below

commit c2c2c3069d325c4cf9040c9a96452757d6a0b93b
Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com>
AuthorDate: Fri Sep 23 18:43:51 2022 +0800

    [INLONG-6005][DataProxy] Fix resend read values (#6006)
---
 .../org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java | 2 ++
 .../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java  | 6 ------
 .../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java    | 4 ----
 3 files changed, 2 insertions(+), 10 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index e420e03a2..cb52823bb 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -161,6 +161,8 @@ public class DataProxyMetricItemSet extends 
MetricItemSet<DataProxyMetricItem> {
                 metricItem.sendFailCount.incrementAndGet();
                 metricItem.sendFailSize.addAndGet(event.getBody().length);
             }
+            metricItem.sendCount.incrementAndGet();
+            metricItem.sendSize.addAndGet(event.getBody().length);
         }
     }
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 3785ae993..cac638ce5 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -402,14 +402,8 @@ public class PulsarSink extends AbstractSink implements 
Configurable, SendMessag
                             + "--> pulsar,Check if pulsar server or network is 
ok.(if this situation "
                             + "last long time it will cause memoryChannel full 
and fileChannel write.)", getName());
                     tx.rollback();
-                    // metric
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, false, event.getBody().length);
                 } else {
                     tx.commit();
-                    // metric
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, true, event.getBody().length);
                 }
             } else {
                 status = Status.BACKOFF;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 86787136a..feb7fc390 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -273,12 +273,8 @@ public class TubeSink extends AbstractSink implements 
Configurable {
                 if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
                     tx.commit();
                     cachedMsgCnt.incrementAndGet();
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, true, event.getBody().length);
                 } else {
                     tx.rollback();
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, false, event.getBody().length);
                 }
             } else {
                 status = Status.BACKOFF;

Reply via email to