This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 97e2bc184c [INLONG-9404][Sort] Fix StarRocks Audit lost when stop job immediately after checkpoint (#9418) 97e2bc184c is described below commit 97e2bc184c986e04a5769d79eda441ed1f18ae80 Author: Sting <zpen...@connect.ust.hk> AuthorDate: Tue Dec 5 16:47:40 2023 +0800 [INLONG-9404][Sort] Fix StarRocks Audit lost when stop job immediately after checkpoint (#9418) --- .../org/apache/inlong/sort/base/metric/SinkMetricData.java | 10 ++++++++++ .../table/sink/table/StarRocksDynamicSinkFunctionV2.java | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 4b48c4ece2..1b09343b5c 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -279,6 +279,16 @@ public class SinkMetricData implements MetricData, Serializable { outputAuditMetrics(rowCount, rowSize, dataTime); } + /** + * flush audit data + * usually call this method in close method or when checkpointing + */ + public void flushAuditData() { + if (auditOperator != null) { + auditOperator.send(); + } + } + private void outputAuditMetrics(long rowCount, long rowSize, long dataTime) { if (auditOperator != null) { for (Integer key : auditKeys) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java index 70326a67a3..01c50b9015 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java @@ -255,6 +255,9 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { sinkManager.flush(); + + flushAudit(); + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { return; } @@ -276,6 +279,12 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct } } + private void flushAudit() { + if (sinkMetricData != null) { + sinkMetricData.flushAuditData(); + } + } + @Override public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {