This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d37c0a6ccd [INLONG-9380][Sort] Audit lost when stop job immediately after checkpoint (#9396) d37c0a6ccd is described below commit d37c0a6ccd465aba38e2612faa9ee99606493d11 Author: vernedeng <verned...@apache.org> AuthorDate: Mon Dec 4 14:20:44 2023 +0800 [INLONG-9380][Sort] Audit lost when stop job immediately after checkpoint (#9396) --- .../inlong/sort/base/metric/SourceMetricData.java | 20 ++++++++++---------- .../sort/iceberg/sink/IcebergStreamWriter.java | 6 ++++++ .../iceberg/sink/IcebergStreamWriterMetrics.java | 6 ++++++ .../iceberg/source/reader/IcebergSourceReader.java | 8 ++++++++ .../reader/InlongIcebergSourceReaderMetrics.java | 6 ++++++ .../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 2 ++ .../table/DynamicTubeMQDeserializationSchema.java | 2 ++ .../DynamicTubeMQTableDeserializationSchema.java | 7 +++++++ 8 files changed, 47 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 1e1a624762..91abcf22aa 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -298,16 +298,6 @@ public class SourceMetricData implements MetricData, Serializable { } } - /** - * flush audit data - * usually call this method in close method or when checkpointing - */ - public void flushAuditData() { - if (auditOperator != null) { - auditOperator.send(); - } - } - public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime) { outputDefaultMetrics(rowCountSize, rowDataSize); if (auditOperator != null) { @@ -345,6 +335,16 @@ public class SourceMetricData implements MetricData, Serializable { } } + /** + * flush audit data + * usually call this method in close method or when checkpointing + */ + public void flushAuditData() { + if (auditOperator != null) { + auditOperator.send(); + } + } + private void outputDefaultMetrics(long rowCountSize, long rowDataSize, long fetchDelay, long emitDelay) { outputDefaultMetrics(rowCountSize, rowDataSize); this.fetchDelay = fetchDelay; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 0cf31c206e..b318306380 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.iceberg.sink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -78,6 +79,11 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> this.writer = taskWriterFactory.create(); } + @Override + public void snapshotState(StateSnapshotContext context) { + writerMetrics.flushAudit(); + } + @Override public void processElement(StreamRecord<T> element) throws Exception { T data = element.getValue(); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java index 1d627714bc..72ca7e0cf5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java @@ -113,4 +113,10 @@ class IcebergStreamWriterMetrics { sourceMetricData.outputMetrics(1, size, time); } } + + void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java index ad3a9b13d4..df75723ceb 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java @@ -28,6 +28,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -38,6 +39,7 @@ public class IcebergSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> { + private final InlongIcebergSourceReaderMetrics<T> metrics; public IcebergSourceReader( InlongIcebergSourceReaderMetrics<T> metrics, ReaderFunction<T> readerFunction, @@ -47,6 +49,7 @@ public class IcebergSourceReader<T> new IcebergSourceRecordEmitter<>(), context.getConfiguration(), context); + this.metrics = metrics; } @Override @@ -62,6 +65,11 @@ public class IcebergSourceReader<T> protected void onSplitFinished(Map<String, IcebergSourceSplit> finishedSplitIds) { requestSplit(Lists.newArrayList(finishedSplitIds.keySet())); } + @Override + public List<IcebergSourceSplit> snapshotState(long checkpointId) { + metrics.flushAudit(); + return super.snapshotState(checkpointId); + } @Override protected IcebergSourceSplit initializedState(IcebergSourceSplit split) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java index 252ae4580d..2210fbca02 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java @@ -77,4 +77,10 @@ public class InlongIcebergSourceReaderMetrics<T> extends IcebergSourceReaderMetr } return object.toString().getBytes(StandardCharsets.UTF_8).length; } + + void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index b9fb6d1b0d..1f261cfef5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -317,6 +317,8 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue())); } + deserializationSchema.flushAudit(); + LOG.info("Successfully save the offsets in checkpoint {}: {}.", context.getCheckpointId(), currentOffsets); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java index 4c4eaac841..c6ec9ea9cb 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java @@ -58,4 +58,6 @@ public interface DynamicTubeMQDeserializationSchema<T> extends Serializable, Res out.collect(deserialize); } } + + void flushAudit(); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java index 8ee154c535..3f2a57d7c7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java @@ -109,6 +109,13 @@ public class DynamicTubeMQTableDeserializationSchema implements DynamicTubeMQDes } + @Override + public void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } + @Override public TypeInformation<RowData> getProducedType() { return producedTypeInfo;