This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f24517f [INLONG-5822][Sort] Fix the error of metric is empty for 
Elasticsearch (#5823)
32f24517f is described below

commit 32f24517f969b237baeb8f1388a46a5285fef425
Author: Oneal65 <liush...@foxmail.com>
AuthorDate: Thu Sep 8 13:02:23 2022 +0800

    [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch 
(#5823)
---
 .../apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java    | 7 -------
 .../sort/elasticsearch/table/RowElasticsearchSinkFunction.java     | 5 +++--
 2 files changed, 3 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index bb963b0a1..000e1c23a 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -274,10 +274,6 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, 
getRuntimeContext().getMetricGroup());
             sinkMetricData.registerMetricsForDirtyBytes();
             sinkMetricData.registerMetricsForDirtyRecords();
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
         callBridge.verifyClientConnection(client);
         bulkProcessor = buildBulkProcessor(new 
BulkProcessorListener(sinkMetricData));
@@ -513,9 +509,6 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
                                 }
                             }
                         }
-                        if (sinkMetricData.getNumRecordsOut() != null) {
-                            sinkMetricData.getNumRecordsOut().inc();
-                        }
                     }
                 } catch (Throwable t) {
                     // fail the sink and skip the rest of the items
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index a5a50b20f..0ae93231d 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -100,8 +100,6 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
             streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, 
runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForDirtyBytes();
-            sinkMetricData.registerMetricsForDirtyRecords();
             sinkMetricData.registerMetricsForNumBytesOut();
             sinkMetricData.registerMetricsForNumRecordsOut();
             sinkMetricData.registerMetricsForNumBytesOutPerSecond();
@@ -130,6 +128,9 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
         if (sinkMetricData.getNumBytesOut() != null) {
             sinkMetricData.getNumBytesOut().inc(document.length);
         }
+        if (sinkMetricData.getNumRecordsOut() != null) {
+            sinkMetricData.getNumRecordsOut().inc();
+        }
         outputMetricForAudit(document.length);
     }
 

Reply via email to