This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d37c0a6ccd [INLONG-9380][Sort] Audit lost when stop job immediately 
after checkpoint (#9396)
d37c0a6ccd is described below

commit d37c0a6ccd465aba38e2612faa9ee99606493d11
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Dec 4 14:20:44 2023 +0800

    [INLONG-9380][Sort] Audit lost when stop job immediately after checkpoint 
(#9396)
---
 .../inlong/sort/base/metric/SourceMetricData.java    | 20 ++++++++++----------
 .../sort/iceberg/sink/IcebergStreamWriter.java       |  6 ++++++
 .../iceberg/sink/IcebergStreamWriterMetrics.java     |  6 ++++++
 .../iceberg/source/reader/IcebergSourceReader.java   |  8 ++++++++
 .../reader/InlongIcebergSourceReaderMetrics.java     |  6 ++++++
 .../inlong/sort/tubemq/FlinkTubeMQConsumer.java      |  2 ++
 .../table/DynamicTubeMQDeserializationSchema.java    |  2 ++
 .../DynamicTubeMQTableDeserializationSchema.java     |  7 +++++++
 8 files changed, 47 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 1e1a624762..91abcf22aa 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -298,16 +298,6 @@ public class SourceMetricData implements MetricData, 
Serializable {
         }
     }
 
-    /**
-     * flush audit data
-     * usually call this method in close method or when checkpointing
-     */
-    public void flushAuditData() {
-        if (auditOperator != null) {
-            auditOperator.send();
-        }
-    }
-
     public void outputMetrics(long rowCountSize, long rowDataSize, long 
dataTime) {
         outputDefaultMetrics(rowCountSize, rowDataSize);
         if (auditOperator != null) {
@@ -345,6 +335,16 @@ public class SourceMetricData implements MetricData, 
Serializable {
         }
     }
 
+    /**
+     * flush audit data
+     * usually call this method in close method or when checkpointing
+     */
+    public void flushAuditData() {
+        if (auditOperator != null) {
+            auditOperator.send();
+        }
+    }
+
     private void outputDefaultMetrics(long rowCountSize, long rowDataSize, 
long fetchDelay, long emitDelay) {
         outputDefaultMetrics(rowCountSize, rowDataSize);
         this.fetchDelay = fetchDelay;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 0cf31c206e..b318306380 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.iceberg.sink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils;
 
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -78,6 +79,11 @@ class IcebergStreamWriter<T> extends 
AbstractStreamOperator<WriteResult>
         this.writer = taskWriterFactory.create();
     }
 
+    @Override
+    public void snapshotState(StateSnapshotContext context) {
+        writerMetrics.flushAudit();
+    }
+
     @Override
     public void processElement(StreamRecord<T> element) throws Exception {
         T data = element.getValue();
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
index 1d627714bc..72ca7e0cf5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
@@ -113,4 +113,10 @@ class IcebergStreamWriterMetrics {
             sourceMetricData.outputMetrics(1, size, time);
         }
     }
+
+    void flushAudit() {
+        if (sourceMetricData != null) {
+            sourceMetricData.flushAuditData();
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index ad3a9b13d4..df75723ceb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -28,6 +28,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -38,6 +39,7 @@ public class IcebergSourceReader<T>
         extends
             SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, 
IcebergSourceSplit, IcebergSourceSplit> {
 
+    private final InlongIcebergSourceReaderMetrics<T> metrics;
     public IcebergSourceReader(
             InlongIcebergSourceReaderMetrics<T> metrics,
             ReaderFunction<T> readerFunction,
@@ -47,6 +49,7 @@ public class IcebergSourceReader<T>
                 new IcebergSourceRecordEmitter<>(),
                 context.getConfiguration(),
                 context);
+        this.metrics = metrics;
     }
 
     @Override
@@ -62,6 +65,11 @@ public class IcebergSourceReader<T>
     protected void onSplitFinished(Map<String, IcebergSourceSplit> 
finishedSplitIds) {
         requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
     }
+    @Override
+    public List<IcebergSourceSplit> snapshotState(long checkpointId) {
+        metrics.flushAudit();
+        return super.snapshotState(checkpointId);
+    }
 
     @Override
     protected IcebergSourceSplit initializedState(IcebergSourceSplit split) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
index 252ae4580d..2210fbca02 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -77,4 +77,10 @@ public class InlongIcebergSourceReaderMetrics<T> extends 
IcebergSourceReaderMetr
         }
         return object.toString().getBytes(StandardCharsets.UTF_8).length;
     }
+
+    void flushAudit() {
+        if (sourceMetricData != null) {
+            sourceMetricData.flushAuditData();
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index b9fb6d1b0d..1f261cfef5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -317,6 +317,8 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
             offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
         }
 
+        deserializationSchema.flushAudit();
+
         LOG.info("Successfully save the offsets in checkpoint {}: {}.",
                 context.getCheckpointId(), currentOffsets);
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index 4c4eaac841..c6ec9ea9cb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -58,4 +58,6 @@ public interface DynamicTubeMQDeserializationSchema<T> 
extends Serializable, Res
             out.collect(deserialize);
         }
     }
+
+    void flushAudit();
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index 8ee154c535..3f2a57d7c7 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -109,6 +109,13 @@ public class DynamicTubeMQTableDeserializationSchema 
implements DynamicTubeMQDes
 
     }
 
+    @Override
+    public void flushAudit() {
+        if (sourceMetricData != null) {
+            sourceMetricData.flushAuditData();
+        }
+    }
+
     @Override
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;

Reply via email to