This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b29cc0d57 [INLONG-10357][Sort] Make StarRocks sink support report 
audit information exactly once (#10549)
5b29cc0d57 is described below

commit 5b29cc0d57d9cf83a7acc079d2579317d4ff7f6b
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Tue Jul 2 18:17:36 2024 +0800

    [INLONG-10357][Sort] Make StarRocks sink support report audit information 
exactly once (#10549)
---
 .../sink/table/StarRocksDynamicSinkFunctionV2.java | 49 +++++++++++++++-------
 .../starrocks/table/sink/utils/SchemaUtils.java    |  6 +--
 2 files changed, 36 insertions(+), 19 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
index 9df5f0e422..b4e915eec9 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.starrocks.table.sink.table;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.SinkExactlyMetric;
 import org.apache.inlong.sort.starrocks.table.sink.utils.SchemaUtils;
 
 import com.google.common.base.Strings;
@@ -70,6 +70,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
+
 /**
  * StarRocks dynamic sink function. It supports insert, upsert, delete in 
Starrocks.
  * @param <T>
@@ -87,7 +89,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
 
     private transient volatile ListState<StarrocksSnapshotState> 
snapshotStates;
     private final Map<Long, List<StreamLoadSnapshot>> snapshotMap = new 
ConcurrentHashMap<>();
-    private transient SinkMetricData sinkMetricData;
+    private transient SinkExactlyMetric sinkExactlyMetric;
 
     @Deprecated
     private transient ListState<Map<String, StarRocksSinkBufferEntity>> 
legacyState;
@@ -207,19 +209,18 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
 
         Object[] data = rowTransformer.transform(value, 
sinkOptions.supportUpsertDelete());
 
-        ouputMetrics(value, data);
-
         sinkManager.write(
                 null,
                 sinkOptions.getDatabaseName(),
                 sinkOptions.getTableName(),
                 serializer.serialize(schemaUtils.filterOutTimeField(data)));
 
+        ouputMetrics(value, data);
     }
 
     private void ouputMetrics(T value, Object[] data) {
-        if (sinkMetricData != null) {
-            sinkMetricData.invokeWithEstimate(value, 
schemaUtils.getDataTime(data));
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.invoke(1, getDataSize(value), 
schemaUtils.getDataTime(data));
         }
     }
 
@@ -237,10 +238,10 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
                 .build();
 
         if (metricOption != null) {
-            sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            sinkExactlyMetric = new SinkExactlyMetric(metricOption, 
getRuntimeContext().getMetricGroup());
         }
 
-        notifyCheckpointComplete(Long.MAX_VALUE);
+        commitTransaction(Long.MAX_VALUE);
         log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
     }
 
@@ -265,10 +266,9 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
 
     @Override
     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
+        updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
         sinkManager.flush();
 
-        flushAudit();
-
         if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
             return;
         }
@@ -290,12 +290,6 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
         }
     }
 
-    private void flushAudit() {
-        if (sinkMetricData != null) {
-            sinkMetricData.flushAuditData();
-        }
-    }
-
     @Override
     public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {
         if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
@@ -346,7 +340,12 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) {
+        commitTransaction(checkpointId);
+        flushAudit();
+        updateLastCheckpointId(checkpointId);
+    }
 
+    private void commitTransaction(long checkpointId) {
         boolean succeed = true;
 
         List<Long> commitCheckpointIds = snapshotMap.keySet().stream()
@@ -395,4 +394,22 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
         legacyData.clear();
     }
 
+    private void flushAudit() {
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.flushAudit();
+        }
+    }
+
+    private void updateCurrentCheckpointId(long checkpointId) {
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    private void updateLastCheckpointId(long checkpointId) {
+        if (sinkExactlyMetric != null) {
+            sinkExactlyMetric.updateLastCheckpointId(checkpointId);
+        }
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
index 76e91e6cf3..a2adb72091 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
@@ -31,7 +31,7 @@ public class SchemaUtils implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final String AUDIT_DATA_TIME = "audit_data_time";
+    private final String AUDIT_DATA_TIME = "AUDIT_DATA_TIME";
     private final int DATA_TIME_ABSENT_INDEX = -1;
     private final int dataTimeFieldIndex;
 
@@ -73,7 +73,7 @@ public class SchemaUtils implements Serializable {
      */
     public String[] filterOutTimeField(TableSchema schema) {
         return Arrays.stream(schema.getFieldNames())
-                .filter(field -> !AUDIT_DATA_TIME.equals(field))
+                .filter(field -> !AUDIT_DATA_TIME.equalsIgnoreCase(field))
                 .toArray(String[]::new);
     }
 
@@ -84,7 +84,7 @@ public class SchemaUtils implements Serializable {
      */
     private int getDataTimeIndex(String[] fieldNames) {
         for (int i = 0; i < fieldNames.length; i++) {
-            if (AUDIT_DATA_TIME.equals(fieldNames[i])) {
+            if (AUDIT_DATA_TIME.equalsIgnoreCase(fieldNames[i])) {
                 return i;
             }
         }

Reply via email to