This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 32f24517f [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823) 32f24517f is described below commit 32f24517f969b237baeb8f1388a46a5285fef425 Author: Oneal65 <liush...@foxmail.com> AuthorDate: Thu Sep 8 13:02:23 2022 +0800 [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823) --- .../apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java | 7 ------- .../sort/elasticsearch/table/RowElasticsearchSinkFunction.java | 5 +++-- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java index bb963b0a1..000e1c23a 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -274,10 +274,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup()); sinkMetricData.registerMetricsForDirtyBytes(); sinkMetricData.registerMetricsForDirtyRecords(); - sinkMetricData.registerMetricsForNumBytesOut(); - sinkMetricData.registerMetricsForNumRecordsOut(); - sinkMetricData.registerMetricsForNumBytesOutPerSecond(); - sinkMetricData.registerMetricsForNumRecordsOutPerSecond(); } callBridge.verifyClientConnection(client); bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData)); @@ -513,9 +509,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends } } } - if (sinkMetricData.getNumRecordsOut() != null) { - sinkMetricData.getNumRecordsOut().inc(); - } } } catch (Throwable t) { // fail the sink and skip the rest of the items diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java index a5a50b20f..0ae93231d 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -100,8 +100,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R streamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup()); - sinkMetricData.registerMetricsForDirtyBytes(); - sinkMetricData.registerMetricsForDirtyRecords(); sinkMetricData.registerMetricsForNumBytesOut(); sinkMetricData.registerMetricsForNumRecordsOut(); sinkMetricData.registerMetricsForNumBytesOutPerSecond(); @@ -130,6 +128,9 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R if (sinkMetricData.getNumBytesOut() != null) { sinkMetricData.getNumBytesOut().inc(document.length); } + if (sinkMetricData.getNumRecordsOut() != null) { + sinkMetricData.getNumRecordsOut().inc(); + } outputMetricForAudit(document.length); }