This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3db440b0c6 [INLONG-11805][Sort] Restored Checkpoint Id as part of Tube Connector Session Key (#11806) 3db440b0c6 is described below commit 3db440b0c64eca4ac9ac1c929cad70a23b235ca3 Author: vernedeng <verned...@apache.org> AuthorDate: Mon Mar 17 16:15:09 2025 +0800 [INLONG-11805][Sort] Restored Checkpoint Id as part of Tube Connector Session Key (#11806) --- .../org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index 47f17eb95d..ab46b972aa 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -144,6 +144,10 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> * The TubeMQ pull consumer. */ private transient PullMessageConsumer messagePullConsumer; + /** + * The restore checkpoint id. + */ + private transient Long restoredCheckpointId; /** * Build a TubeMQ source function @@ -190,11 +194,12 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> @Override public void initializeState(FunctionInitializationContext context) throws Exception { + restoredCheckpointId = context.getRestoredCheckpointId().orElse(-1L); + TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO); ListStateDescriptor<Tuple2<String, Long>> stateDescriptor = new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation); - OperatorStateStore stateStore = context.getOperatorStateStore(); offsetsState = stateStore.getListState(stateDescriptor); currentOffsets = new HashMap<>(); @@ -224,7 +229,8 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); messagePullConsumer.subscribe(topic, streamIdSet); String jobId = getRuntimeContext().getJobId().toString(); - messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets); + String realSessionKey = sessionKey + jobId + restoredCheckpointId; + messagePullConsumer.completeSubscribe(realSessionKey, numTasks, true, currentOffsets); running = true; }