This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3db440b0c6 [INLONG-11805][Sort] Restored Checkpoint Id as part of Tube 
Connector Session Key (#11806)
3db440b0c6 is described below

commit 3db440b0c64eca4ac9ac1c929cad70a23b235ca3
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Mar 17 16:15:09 2025 +0800

    [INLONG-11805][Sort] Restored Checkpoint Id as part of Tube Connector 
Session Key (#11806)
---
 .../org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java     | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index 47f17eb95d..ab46b972aa 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -144,6 +144,10 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
      * The TubeMQ pull consumer.
      */
     private transient PullMessageConsumer messagePullConsumer;
+    /**
+     * The restore checkpoint id.
+     */
+    private transient Long restoredCheckpointId;
 
     /**
      * Build a TubeMQ source function
@@ -190,11 +194,12 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        restoredCheckpointId = context.getRestoredCheckpointId().orElse(-1L);
+
         TypeInformation<Tuple2<String, Long>> typeInformation =
                 new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO);
         ListStateDescriptor<Tuple2<String, Long>> stateDescriptor =
                 new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation);
-
         OperatorStateStore stateStore = context.getOperatorStateStore();
         offsetsState = stateStore.getListState(stateDescriptor);
         currentOffsets = new HashMap<>();
@@ -224,7 +229,8 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
         messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, streamIdSet);
         String jobId = getRuntimeContext().getJobId().toString();
-        messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), 
numTasks, true, currentOffsets);
+        String realSessionKey = sessionKey + jobId + restoredCheckpointId;
+        messagePullConsumer.completeSubscribe(realSessionKey, numTasks, true, 
currentOffsets);
 
         running = true;
     }

Reply via email to