This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5d56e4107a [INLONG-10312][Sort] Iceberg sink support report audit information exactly once (#10453) 5d56e4107a is described below commit 5d56e4107aaf00c7cb6da3e04f1d9b512013109b Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Tue Jun 25 09:53:32 2024 +0800 [INLONG-10312][Sort] Iceberg sink support report audit information exactly once (#10453) --- .../inlong/sort/base/metric/SinkExactlyMetric.java | 2 +- .../sort/iceberg/sink/IcebergStreamWriter.java | 6 +++++ .../iceberg/sink/IcebergStreamWriterMetrics.java | 28 +++++++++++++++------- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java index 5c916a1b56..cb3d20f389 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java @@ -230,7 +230,7 @@ public class SinkExactlyMetric implements MetricData, Serializable { invokeDirty(1, getDataSize(o)); } - public void invokeWithId(long rowCount, long rowSize, long dataTime) { + public void invoke(long rowCount, long rowSize, long dataTime) { outputDefaultMetrics(rowCount, rowSize); outputAuditMetricsWithId(rowCount, rowSize, dataTime); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index b318306380..eb62400ad8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -81,7 +81,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> @Override public void snapshotState(StateSnapshotContext context) { + writerMetrics.updateCurrentCheckpointId(context.getCheckpointId()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { writerMetrics.flushAudit(); + writerMetrics.updateLastCheckpointId(checkpointId); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java index 72ca7e0cf5..3a219cd6d5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java @@ -18,7 +18,7 @@ package org.apache.inlong.sort.iceberg.sink; import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.SinkExactlyMetric; import com.codahale.metrics.SlidingWindowReservoir; import lombok.extern.slf4j.Slf4j; @@ -41,8 +41,8 @@ class IcebergStreamWriterMetrics { // It should also produce good accuracy for histogram distribution (like percentiles). private static final int HISTOGRAM_RESERVOIR_SIZE = 1024; - private MetricGroup metrics; - private SourceMetricData sourceMetricData; + private final MetricGroup metrics; + private SinkExactlyMetric sinkExactlyMetric; private final Counter flushedDataFiles; private final Counter flushedDeleteFiles; @@ -77,7 +77,7 @@ class IcebergStreamWriterMetrics { public void registerMetrics(MetricOption metricOption) { if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption, metrics); + sinkExactlyMetric = new SinkExactlyMetric(metricOption, metrics); } else { log.warn("failed to init sourceMetricData since the metricOption is null"); } @@ -109,14 +109,26 @@ class IcebergStreamWriterMetrics { } void outputMetricsWithEstimate(int size, long time) { - if (sourceMetricData != null) { - sourceMetricData.outputMetrics(1, size, time); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.invoke(1, size, time); } } void flushAudit() { - if (sourceMetricData != null) { - sourceMetricData.flushAuditData(); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.flushAudit(); + } + } + + void updateCurrentCheckpointId(long checkpointId) { + if (sinkExactlyMetric != null) { + sinkExactlyMetric.updateCurrentCheckpointId(checkpointId); + } + } + + void updateLastCheckpointId(long checkpointId) { + if (sinkExactlyMetric != null) { + sinkExactlyMetric.updateLastCheckpointId(checkpointId); } } }