This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8af33aa154 [INLONG-11738][SDK] Optimize the metric output when MetricDataHolder exits (#11739) 8af33aa154 is described below commit 8af33aa1543faf101e6c4b5f5bdfefa59749fb22 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Feb 10 10:52:03 2025 +0800 [INLONG-11738][SDK] Optimize the metric output when MetricDataHolder exits (#11739) --- .../sdk/dataproxy/metric/MetricDataHolder.java | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java index 03e260789a..9b8e19560c 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java @@ -73,7 +73,7 @@ public class MetricDataHolder implements Runnable { @Override public void run() { long startTime = System.currentTimeMillis(); - outputMetricData(startTime, getAndIncIndex()); + outputMetricData(false, startTime, getAndIncIndex()); long dltTime = System.currentTimeMillis() - startTime; if (dltTime > this.metricConfig.getMetricOutWarnIntMs()) { logger.warn("Metric DataHolder({}) snapshot finished, cost = {} ms!", @@ -88,8 +88,8 @@ public class MetricDataHolder implements Runnable { long startTime = System.currentTimeMillis(); this.started = false; this.outputExecutor.shutdown(); - outputMetricData(startTime, getOldIndex()); - outputMetricData(startTime, getCurIndex()); + outputMetricData(true, startTime, getOldIndex()); + outputMetricData(true, startTime, getCurIndex()); logger.info("Metric DataHolder({}) closed, cost = {} ms!", this.sender.getSenderId(), System.currentTimeMillis() - startTime); } @@ -163,8 +163,11 @@ public class MetricDataHolder implements Runnable { } } - private void outputMetricData(long reportTime, int readIndex) { - if (!this.started || !this.metricConfig.isEnableMetric()) { + private void outputMetricData(boolean forceOutput, long reportTime, int readIndex) { + if (!this.metricConfig.isEnableMetric()) { + return; + } + if (!forceOutput && !this.started) { return; } MetricInfoUnit selectedUnit = metricUnits[readIndex]; @@ -173,7 +176,8 @@ public class MetricDataHolder implements Runnable { } long startTime = System.currentTimeMillis(); do { - if (System.currentTimeMillis() - startTime >= 5000L) { + if ((!forceOutput && !this.started) + || (System.currentTimeMillis() - startTime >= 5000L)) { break; } try { @@ -181,8 +185,8 @@ public class MetricDataHolder implements Runnable { } catch (InterruptedException e) { break; } - } while (started && selectedUnit.refCnt.get() > 0); - if (!started) { + } while (selectedUnit.refCnt.get() > 0); + if (!forceOutput && !this.started) { logger.info("Metric DataHolder({}) closed, stop output metric info", sender.getSenderId()); return;