This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0176ef4fac [ISSUE #9149]Assign offset in offsetTable even if the 
subscription key not exist. (#9150)
0176ef4fac is described below

commit 0176ef4fac80ace7b972088381c80d71f7475a6e
Author: dingshuangxi888 <dingshuangxi...@gmail.com>
AuthorDate: Thu Feb 6 16:48:31 2025 +0800

    [ISSUE #9149]Assign offset in offsetTable even if the subscription key not 
exist. (#9150)
    
    * Assign offset in offsetTable even if the subscription key not exist.
---
 .../broker/offset/ConsumerOffsetManager.java         | 20 ++++----------------
 1 file changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ea46f1d8a1..85bc8e3789 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.offset;
 
+import com.google.common.collect.Maps;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -417,27 +418,14 @@ public class ConsumerOffsetManager extends ConfigManager {
         }
 
         String key = topic + TOPIC_GROUP_SEPARATOR + group;
-        ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
-        if (null == map) {
-            map = new ConcurrentHashMap<Integer, Long>();
-            ConcurrentMap<Integer, Long> previous = 
resetOffsetTable.putIfAbsent(key, map);
-            if (null != previous) {
-                map = previous;
-            }
-        }
-
-        map.put(queueId, offset);
-        LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, 
resetOffset={}",
-            topic, group, queueId, offset);
+        resetOffsetTable.computeIfAbsent(key, k -> 
Maps.newConcurrentMap()).put(queueId, offset);
+        LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, 
resetOffset={}", topic, group, queueId, offset);
 
         // Two things are important here:
         // 1, currentOffsetMap might be null if there is no previous records;
         // 2, Our overriding here may get overridden by the client instantly 
in concurrent cases; But it still makes
         // sense in cases like clients are offline.
-        ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key);
-        if (null != currentOffsetMap) {
-            currentOffsetMap.put(queueId, offset);
-        }
+        offsetTable.computeIfAbsent(key, k -> 
Maps.newConcurrentMap()).put(queueId, offset);
     }
 
     public boolean hasOffsetReset(String topic, String group, int queueId) {

Reply via email to