This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 97e2bc184c [INLONG-9404][Sort] Fix StarRocks Audit lost when stop job 
immediately after checkpoint (#9418)
97e2bc184c is described below

commit 97e2bc184c986e04a5769d79eda441ed1f18ae80
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Tue Dec 5 16:47:40 2023 +0800

    [INLONG-9404][Sort] Fix StarRocks Audit lost when stop job immediately 
after checkpoint (#9418)
---
 .../org/apache/inlong/sort/base/metric/SinkMetricData.java     | 10 ++++++++++
 .../table/sink/table/StarRocksDynamicSinkFunctionV2.java       |  9 +++++++++
 2 files changed, 19 insertions(+)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 4b48c4ece2..1b09343b5c 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -279,6 +279,16 @@ public class SinkMetricData implements MetricData, 
Serializable {
         outputAuditMetrics(rowCount, rowSize, dataTime);
     }
 
+    /**
+     * flush audit data
+     * usually call this method in close method or when checkpointing
+     */
+    public void flushAuditData() {
+        if (auditOperator != null) {
+            auditOperator.send();
+        }
+    }
+
     private void outputAuditMetrics(long rowCount, long rowSize, long 
dataTime) {
         if (auditOperator != null) {
             for (Integer key : auditKeys) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
index 70326a67a3..01c50b9015 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -255,6 +255,9 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
     @Override
     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
         sinkManager.flush();
+
+        flushAudit();
+
         if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
             return;
         }
@@ -276,6 +279,12 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
         }
     }
 
+    private void flushAudit() {
+        if (sinkMetricData != null) {
+            sinkMetricData.flushAuditData();
+        }
+    }
+
     @Override
     public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {
         if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {

Reply via email to