This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a859e7973 [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) a859e7973 is described below commit a859e7973b699c4b3a4e6c8d5919d63a8e41efb7 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:02:06 2022 +0800 [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) Co-authored-by: thesumery <158971...@qq.com> --- .../hive/filesystem/AbstractStreamingWriter.java | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java index ab2e845f2..dd9456203 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.hive.filesystem; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; + import javax.annotation.Nullable; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send * file and bucket information to downstream. @@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe @Nullable private transient SinkMetricData metricData; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; public AbstractStreamingWriter( long bucketCheckInterval, @@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe bucketCheckInterval); currentWatermark = Long.MIN_VALUE; + + // init metric state + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); helper.snapshotState(context.getCheckpointId()); + if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } @Override