This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d56e4107a [INLONG-10312][Sort] Iceberg sink support report audit 
information exactly once (#10453)
5d56e4107a is described below

commit 5d56e4107aaf00c7cb6da3e04f1d9b512013109b
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Tue Jun 25 09:53:32 2024 +0800

    [INLONG-10312][Sort] Iceberg sink support report audit information exactly 
once (#10453)
---
 .../inlong/sort/base/metric/SinkExactlyMetric.java |  2 +-
 .../sort/iceberg/sink/IcebergStreamWriter.java     |  6 +++++
 .../iceberg/sink/IcebergStreamWriterMetrics.java   | 28 +++++++++++++++-------
 3 files changed, 27 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
index 5c916a1b56..cb3d20f389 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
@@ -230,7 +230,7 @@ public class SinkExactlyMetric implements MetricData, 
Serializable {
         invokeDirty(1, getDataSize(o));
     }
 
-    public void invokeWithId(long rowCount, long rowSize, long dataTime) {
+    public void invoke(long rowCount, long rowSize, long dataTime) {
         outputDefaultMetrics(rowCount, rowSize);
         outputAuditMetricsWithId(rowCount, rowSize, dataTime);
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index b318306380..eb62400ad8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -81,7 +81,13 @@ class IcebergStreamWriter<T> extends 
AbstractStreamOperator<WriteResult>
 
     @Override
     public void snapshotState(StateSnapshotContext context) {
+        writerMetrics.updateCurrentCheckpointId(context.getCheckpointId());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
         writerMetrics.flushAudit();
+        writerMetrics.updateLastCheckpointId(checkpointId);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
index 72ca7e0cf5..3a219cd6d5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.iceberg.sink;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SinkExactlyMetric;
 
 import com.codahale.metrics.SlidingWindowReservoir;
 import lombok.extern.slf4j.Slf4j;
@@ -41,8 +41,8 @@ class IcebergStreamWriterMetrics {
     // It should also produce good accuracy for histogram distribution (like 
percentiles).
     private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
 
-    private MetricGroup metrics;
-    private SourceMetricData sourceMetricData;
+    private final MetricGroup metrics;
+    private SinkExactlyMetric sinkExactlyMetric;
 
     private final Counter flushedDataFiles;
     private final Counter flushedDeleteFiles;
@@ -77,7 +77,7 @@ class IcebergStreamWriterMetrics {
 
     public void registerMetrics(MetricOption metricOption) {
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption, metrics);
+            sinkExactlyMetric = new SinkExactlyMetric(metricOption, metrics);
         } else {
             log.warn("failed to init sourceMetricData since the metricOption 
is null");
         }
@@ -109,14 +109,26 @@ class IcebergStreamWriterMetrics {
     }
 
     void outputMetricsWithEstimate(int size, long time) {
-        if (sourceMetricData != null) {
-            sourceMetricData.outputMetrics(1, size, time);
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.invoke(1, size, time);
         }
     }
 
     void flushAudit() {
-        if (sourceMetricData != null) {
-            sourceMetricData.flushAuditData();
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.flushAudit();
+        }
+    }
+
+    void updateCurrentCheckpointId(long checkpointId) {
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    void updateLastCheckpointId(long checkpointId) {
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.updateLastCheckpointId(checkpointId);
         }
     }
 }

Reply via email to