This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5b29cc0d57 [INLONG-10357][Sort] Make StarRocks sink support report audit information exactly once (#10549) 5b29cc0d57 is described below commit 5b29cc0d57d9cf83a7acc079d2579317d4ff7f6b Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Tue Jul 2 18:17:36 2024 +0800 [INLONG-10357][Sort] Make StarRocks sink support report audit information exactly once (#10549) --- .../sink/table/StarRocksDynamicSinkFunctionV2.java | 49 +++++++++++++++------- .../starrocks/table/sink/utils/SchemaUtils.java | 6 +-- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java index 9df5f0e422..b4e915eec9 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java @@ -18,7 +18,7 @@ package org.apache.inlong.sort.starrocks.table.sink.table; import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.SinkExactlyMetric; import org.apache.inlong.sort.starrocks.table.sink.utils.SchemaUtils; import com.google.common.base.Strings; @@ -70,6 +70,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; + /** * StarRocks dynamic sink function. It supports insert, upsert, delete in Starrocks. * @param <T> @@ -87,7 +89,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct private transient volatile ListState<StarrocksSnapshotState> snapshotStates; private final Map<Long, List<StreamLoadSnapshot>> snapshotMap = new ConcurrentHashMap<>(); - private transient SinkMetricData sinkMetricData; + private transient SinkExactlyMetric sinkExactlyMetric; @Deprecated private transient ListState<Map<String, StarRocksSinkBufferEntity>> legacyState; @@ -207,19 +209,18 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); - ouputMetrics(value, data); - sinkManager.write( null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), serializer.serialize(schemaUtils.filterOutTimeField(data))); + ouputMetrics(value, data); } private void ouputMetrics(T value, Object[] data) { - if (sinkMetricData != null) { - sinkMetricData.invokeWithEstimate(value, schemaUtils.getDataTime(data)); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.invoke(1, getDataSize(value), schemaUtils.getDataTime(data)); } } @@ -237,10 +238,10 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct .build(); if (metricOption != null) { - sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); } - notifyCheckpointComplete(Long.MAX_VALUE); + commitTransaction(Long.MAX_VALUE); log.info("Open sink function v2. {}", EnvUtils.getGitInformation()); } @@ -265,10 +266,9 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); sinkManager.flush(); - flushAudit(); - if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { return; } @@ -290,12 +290,6 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct } } - private void flushAudit() { - if (sinkMetricData != null) { - sinkMetricData.flushAuditData(); - } - } - @Override public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { @@ -346,7 +340,12 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct @Override public void notifyCheckpointComplete(long checkpointId) { + commitTransaction(checkpointId); + flushAudit(); + updateLastCheckpointId(checkpointId); + } + private void commitTransaction(long checkpointId) { boolean succeed = true; List<Long> commitCheckpointIds = snapshotMap.keySet().stream() @@ -395,4 +394,22 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct legacyData.clear(); } + private void flushAudit() { + if (sinkExactlyMetric != null) { + sinkExactlyMetric.flushAudit(); + } + } + + private void updateCurrentCheckpointId(long checkpointId) { + if (sinkExactlyMetric != null) { + sinkExactlyMetric.updateCurrentCheckpointId(checkpointId); + } + } + + private void updateLastCheckpointId(long checkpointId) { + if (sinkExactlyMetric != null) { + sinkExactlyMetric.updateLastCheckpointId(checkpointId); + } + } + } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java index 76e91e6cf3..a2adb72091 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java @@ -31,7 +31,7 @@ public class SchemaUtils implements Serializable { private static final long serialVersionUID = 1L; - private final String AUDIT_DATA_TIME = "audit_data_time"; + private final String AUDIT_DATA_TIME = "AUDIT_DATA_TIME"; private final int DATA_TIME_ABSENT_INDEX = -1; private final int dataTimeFieldIndex; @@ -73,7 +73,7 @@ public class SchemaUtils implements Serializable { */ public String[] filterOutTimeField(TableSchema schema) { return Arrays.stream(schema.getFieldNames()) - .filter(field -> !AUDIT_DATA_TIME.equals(field)) + .filter(field -> !AUDIT_DATA_TIME.equalsIgnoreCase(field)) .toArray(String[]::new); } @@ -84,7 +84,7 @@ public class SchemaUtils implements Serializable { */ private int getDataTimeIndex(String[] fieldNames) { for (int i = 0; i < fieldNames.length; i++) { - if (AUDIT_DATA_TIME.equals(fieldNames[i])) { + if (AUDIT_DATA_TIME.equalsIgnoreCase(fieldNames[i])) { return i; } }