This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 0176ef4fac [ISSUE #9149]Assign offset in offsetTable even if the subscription key not exist. (#9150) 0176ef4fac is described below commit 0176ef4fac80ace7b972088381c80d71f7475a6e Author: dingshuangxi888 <dingshuangxi...@gmail.com> AuthorDate: Thu Feb 6 16:48:31 2025 +0800 [ISSUE #9149]Assign offset in offsetTable even if the subscription key not exist. (#9150) * Assign offset in offsetTable even if the subscription key not exist. --- .../broker/offset/ConsumerOffsetManager.java | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index ea46f1d8a1..85bc8e3789 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.offset; +import com.google.common.collect.Maps; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -417,27 +418,14 @@ public class ConsumerOffsetManager extends ConfigManager { } String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key); - if (null == map) { - map = new ConcurrentHashMap<Integer, Long>(); - ConcurrentMap<Integer, Long> previous = resetOffsetTable.putIfAbsent(key, map); - if (null != previous) { - map = previous; - } - } - - map.put(queueId, offset); - LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", - topic, group, queueId, offset); + resetOffsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset); + LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", topic, group, queueId, offset); // Two things are important here: // 1, currentOffsetMap might be null if there is no previous records; // 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes // sense in cases like clients are offline. - ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key); - if (null != currentOffsetMap) { - currentOffsetMap.put(queueId, offset); - } + offsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset); } public boolean hasOffsetReset(String topic, String group, int queueId) {