This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ebfe3711cb [INLONG-9758][Sort] StarRocks connector support state key when initializing (#9759) ebfe3711cb is described below commit ebfe3711cb120c6a36e448f24b1c89b2cb766c9b Author: vernedeng <verned...@apache.org> AuthorDate: Sun Mar 3 11:25:17 2024 +0800 [INLONG-9758][Sort] StarRocks connector support state key when initializing (#9759) * [INLONG-9758][Sort] StarRocks connector support state key when initializing * fix checkstyle --- .../sink/table/StarRocksDynamicSinkFunctionV2.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java index 01c50b9015..9df5f0e422 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java @@ -55,6 +55,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.NestedRowData; import org.apache.flink.types.RowKind; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +97,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct private String auditHostAndPorts; private String auditKeys; private SchemaUtils schemaUtils; + private String stateKey; public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, TableSchema schema, @@ -120,6 +122,15 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); } + public StarRocksDynamicSinkFunctionV2( + StarRocksSinkOptions sinkOptions, + TableSchema schema, + StarRocksIRowTransformer<T> rowTransformer, String inlongMetric, + String auditHostAndPorts, String auditKeys, String stateKey) { + this(sinkOptions, schema, rowTransformer, inlongMetric, auditHostAndPorts, auditKeys); + this.stateKey = stateKey; + } + @Override public void invoke(T value, Context context) throws IOException, ClassNotFoundException, JSQLParserException { @@ -291,19 +302,23 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct return; } + String transactionStateName = "starrocks-sink-transaction" + + (StringUtils.isNullOrWhitespaceOnly(stateKey) ? "" : "-" + stateKey); ListStateDescriptor<byte[]> descriptor = new ListStateDescriptor<>( - "starrocks-sink-transaction", + transactionStateName, TypeInformation.of(new TypeHint<byte[]>() { })); ListState<byte[]> listState = functionInitializationContext.getOperatorStateStore().getListState(descriptor); snapshotStates = new SimpleVersionedListState<>(listState, new StarRocksVersionedSerializer()); + String legacyStateName = "buffered-rows" + + (StringUtils.isNullOrWhitespaceOnly(stateKey) ? "" : "-" + stateKey); // old version ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> legacyDescriptor = new ListStateDescriptor<>( - "buffered-rows", + legacyStateName, TypeInformation.of(new TypeHint<Map<String, StarRocksSinkBufferEntity>>() { })); legacyState = functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor);