This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 341aa987b [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) 341aa987b is described below commit 341aa987ba86d0598af321d8e5a164a02224ff26 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:05:23 2022 +0800 [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) Co-authored-by: thesumery <158971...@qq.com> --- .../inlong/sort/base/metric/MetricOption.java | 8 +++++ .../iceberg/flink/sink/IcebergStreamWriter.java | 41 ++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index f4c679f9c..8cf0d6f01 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -111,6 +111,14 @@ public class MetricOption { return initBytes; } + public void setInitRecords(long initRecords) { + this.initRecords = initRecords; + } + + public void setInitBytes(long initBytes) { + this.initBytes = initBytes; + } + public static Builder builder() { return new Builder(); } diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java index bb00b7808..ef7612743 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.flink.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -51,6 +63,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> private transient int attemptId; @Nullable private transient SinkMetricData metricData; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; IcebergStreamWriter(String fullTableName, TaskWriterFactory<T> taskWriterFactory, @@ -74,6 +88,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> // Initialize metric if (metricOption != null) { + metricOption.setInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L); + metricOption.setInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L); metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } @@ -95,6 +111,31 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> } } + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + // init metric state + if (this.metricData != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + @Override public void close() throws Exception { super.close();