This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ebfe3711cb [INLONG-9758][Sort] StarRocks connector support state key 
when initializing (#9759)
ebfe3711cb is described below

commit ebfe3711cb120c6a36e448f24b1c89b2cb766c9b
Author: vernedeng <verned...@apache.org>
AuthorDate: Sun Mar 3 11:25:17 2024 +0800

    [INLONG-9758][Sort] StarRocks connector support state key when initializing 
(#9759)
    
    * [INLONG-9758][Sort] StarRocks connector support state key when 
initializing
    
    * fix checkstyle
---
 .../sink/table/StarRocksDynamicSinkFunctionV2.java    | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
index 01c50b9015..9df5f0e422 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -55,6 +55,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.NestedRowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,6 +97,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
     private String auditHostAndPorts;
     private String auditKeys;
     private SchemaUtils schemaUtils;
+    private String stateKey;
 
     public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
             TableSchema schema,
@@ -120,6 +122,15 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
                 sinkOptions.getSemantic() == 
StarRocksSinkSemantic.AT_LEAST_ONCE);
     }
 
+    public StarRocksDynamicSinkFunctionV2(
+            StarRocksSinkOptions sinkOptions,
+            TableSchema schema,
+            StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
+            String auditHostAndPorts, String auditKeys, String stateKey) {
+        this(sinkOptions, schema, rowTransformer, inlongMetric, 
auditHostAndPorts, auditKeys);
+        this.stateKey = stateKey;
+    }
+
     @Override
     public void invoke(T value, Context context)
             throws IOException, ClassNotFoundException, JSQLParserException {
@@ -291,19 +302,23 @@ public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunct
             return;
         }
 
+        String transactionStateName = "starrocks-sink-transaction"
+                + (StringUtils.isNullOrWhitespaceOnly(stateKey) ? "" : "-" + 
stateKey);
         ListStateDescriptor<byte[]> descriptor =
                 new ListStateDescriptor<>(
-                        "starrocks-sink-transaction",
+                        transactionStateName,
                         TypeInformation.of(new TypeHint<byte[]>() {
                         }));
 
         ListState<byte[]> listState = 
functionInitializationContext.getOperatorStateStore().getListState(descriptor);
         snapshotStates = new SimpleVersionedListState<>(listState, new 
StarRocksVersionedSerializer());
 
+        String legacyStateName = "buffered-rows"
+                + (StringUtils.isNullOrWhitespaceOnly(stateKey) ? "" : "-" + 
stateKey);
         // old version
         ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> 
legacyDescriptor =
                 new ListStateDescriptor<>(
-                        "buffered-rows",
+                        legacyStateName,
                         TypeInformation.of(new TypeHint<Map<String, 
StarRocksSinkBufferEntity>>() {
                         }));
         legacyState = 
functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor);

Reply via email to