This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8af33aa154 [INLONG-11738][SDK] Optimize the metric output when 
MetricDataHolder exits (#11739)
8af33aa154 is described below

commit 8af33aa1543faf101e6c4b5f5bdfefa59749fb22
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Feb 10 10:52:03 2025 +0800

    [INLONG-11738][SDK] Optimize the metric output when MetricDataHolder exits 
(#11739)
---
 .../sdk/dataproxy/metric/MetricDataHolder.java       | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index 03e260789a..9b8e19560c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -73,7 +73,7 @@ public class MetricDataHolder implements Runnable {
     @Override
     public void run() {
         long startTime = System.currentTimeMillis();
-        outputMetricData(startTime, getAndIncIndex());
+        outputMetricData(false, startTime, getAndIncIndex());
         long dltTime = System.currentTimeMillis() - startTime;
         if (dltTime > this.metricConfig.getMetricOutWarnIntMs()) {
             logger.warn("Metric DataHolder({}) snapshot finished, cost = {} 
ms!",
@@ -88,8 +88,8 @@ public class MetricDataHolder implements Runnable {
         long startTime = System.currentTimeMillis();
         this.started = false;
         this.outputExecutor.shutdown();
-        outputMetricData(startTime, getOldIndex());
-        outputMetricData(startTime, getCurIndex());
+        outputMetricData(true, startTime, getOldIndex());
+        outputMetricData(true, startTime, getCurIndex());
         logger.info("Metric DataHolder({}) closed, cost = {} ms!",
                 this.sender.getSenderId(), System.currentTimeMillis() - 
startTime);
     }
@@ -163,8 +163,11 @@ public class MetricDataHolder implements Runnable {
         }
     }
 
-    private void outputMetricData(long reportTime, int readIndex) {
-        if (!this.started || !this.metricConfig.isEnableMetric()) {
+    private void outputMetricData(boolean forceOutput, long reportTime, int 
readIndex) {
+        if (!this.metricConfig.isEnableMetric()) {
+            return;
+        }
+        if (!forceOutput && !this.started) {
             return;
         }
         MetricInfoUnit selectedUnit = metricUnits[readIndex];
@@ -173,7 +176,8 @@ public class MetricDataHolder implements Runnable {
         }
         long startTime = System.currentTimeMillis();
         do {
-            if (System.currentTimeMillis() - startTime >= 5000L) {
+            if ((!forceOutput && !this.started)
+                    || (System.currentTimeMillis() - startTime >= 5000L)) {
                 break;
             }
             try {
@@ -181,8 +185,8 @@ public class MetricDataHolder implements Runnable {
             } catch (InterruptedException e) {
                 break;
             }
-        } while (started && selectedUnit.refCnt.get() > 0);
-        if (!started) {
+        } while (selectedUnit.refCnt.get() > 0);
+        if (!forceOutput && !this.started) {
             logger.info("Metric DataHolder({}) closed, stop output metric 
info",
                     sender.getSenderId());
             return;

Reply via email to