This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f92147e2 [INLONG-9384][Sort] Fix pulsar audit data loss when 
restarting (#9386)
65f92147e2 is described below

commit 65f92147e2dce57c28b9e6ea8c42c4ed903939b3
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Fri Dec 1 18:54:25 2023 +0800

    [INLONG-9384][Sort] Fix pulsar audit data loss when restarting (#9386)
---
 .../org/apache/inlong/sort/base/metric/SourceMetricData.java   | 10 ++++++++++
 .../apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java  | 10 ++++++++++
 2 files changed, 20 insertions(+)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 0a0cad5d6e..1e1a624762 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -298,6 +298,16 @@ public class SourceMetricData implements MetricData, 
Serializable {
         }
     }
 
+    /**
+     * flush audit data
+     * usually call this method in close method or when checkpointing
+     */
+    public void flushAuditData() {
+        if (auditOperator != null) {
+            auditOperator.send();
+        }
+    }
+
     public void outputMetrics(long rowCountSize, long rowDataSize, long 
dataTime) {
         outputDefaultMetrics(rowCountSize, rowDataSize);
         if (auditOperator != null) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index b193a5af1a..a963edf02a 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -886,6 +886,9 @@ public class FlinkPulsarSource<T>
         if (!running) {
             log.debug("snapshotState() called on closed source");
         } else {
+
+            flushAudit();
+
             unionOffsetStates.clear();
 
             PulsarFetcher<T> fetcher = this.pulsarFetcher;
@@ -925,6 +928,13 @@ public class FlinkPulsarSource<T>
         }
     }
 
+    // flush audit data first to avoid audit data loss
+    private void flushAudit() {
+        if (sourceMetricData != null) {
+            sourceMetricData.flushAuditData();
+        }
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         if (!running) {

Reply via email to