This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 65f92147e2 [INLONG-9384][Sort] Fix pulsar audit data loss when restarting (#9386) 65f92147e2 is described below commit 65f92147e2dce57c28b9e6ea8c42c4ed903939b3 Author: Sting <zpen...@connect.ust.hk> AuthorDate: Fri Dec 1 18:54:25 2023 +0800 [INLONG-9384][Sort] Fix pulsar audit data loss when restarting (#9386) --- .../org/apache/inlong/sort/base/metric/SourceMetricData.java | 10 ++++++++++ .../apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 0a0cad5d6e..1e1a624762 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -298,6 +298,16 @@ public class SourceMetricData implements MetricData, Serializable { } } + /** + * flush audit data + * usually call this method in close method or when checkpointing + */ + public void flushAuditData() { + if (auditOperator != null) { + auditOperator.send(); + } + } + public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime) { outputDefaultMetrics(rowCountSize, rowDataSize); if (auditOperator != null) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java index b193a5af1a..a963edf02a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java @@ -886,6 +886,9 @@ public class FlinkPulsarSource<T> if (!running) { log.debug("snapshotState() called on closed source"); } else { + + flushAudit(); + unionOffsetStates.clear(); PulsarFetcher<T> fetcher = this.pulsarFetcher; @@ -925,6 +928,13 @@ public class FlinkPulsarSource<T> } } + // flush audit data first to avoid audit data loss + private void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { if (!running) {