This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0394181db [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) 0394181db is described below commit 0394181db774a946c88cf3ee2b0668e644e81465 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:04:01 2022 +0800 [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) Co-authored-by: thesumery <158971...@qq.com> --- .../sort/iceberg/sink/IcebergStreamWriter.java | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 8318c7177..6f1b75ea0 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -53,6 +65,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> private transient int attemptId; @Nullable private transient SinkMetricData metricData; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; IcebergStreamWriter( String fullTableName, @@ -81,6 +95,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -105,6 +121,31 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> } } + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + // init metric state + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + @Override public void dispose() throws Exception { super.dispose();