This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7ef36102ea [ISSUE #9351] Add topic-group mapping in 
queryTopicConsumeByWho command (#9352)
7ef36102ea is described below

commit 7ef36102eae5c1248354942a509d39c7a4942006
Author: yangguodong <1174533...@qq.com>
AuthorDate: Wed Apr 23 13:51:01 2025 +0800

    [ISSUE #9351] Add topic-group mapping in queryTopicConsumeByWho command 
(#9352)
---
 .../rocketmq/broker/client/ConsumerManager.java    | 48 ++++++++++++++++++----
 1 file changed, 40 insertions(+), 8 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index c658b128eb..04238e2c30 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -42,6 +42,8 @@ public class ConsumerManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
         new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<String, Set<String>> topicGroupTable =
+            new ConcurrentHashMap<>(1024);
     private final ConcurrentMap<String, ConsumerGroupInfo> 
consumerCompensationTable =
         new ConcurrentHashMap<>(1024);
     private final List<ConsumerIdsChangeListener> 
consumerIdsChangeListenerList = new CopyOnWriteArrayList<>();
@@ -156,6 +158,7 @@ public class ConsumerManager {
                             LOGGER.info("unregister consumer ok, no any 
connection, and remove consumer group, {}",
                                     group);
                             
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
+                            clearTopicGroupTable(remove);
                         }
                     }
                     callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, 
group, consumerGroupInfo.getAllChannel());
@@ -177,6 +180,7 @@ public class ConsumerManager {
                         LOGGER.info("unregister consumer ok, no any 
connection, and remove consumer group, {}",
                             next.getKey());
                         
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey());
+                        clearTopicGroupTable(remove);
                     }
                 }
                 if (!isBroadcastMode(info.getMessageModel())) {
@@ -187,6 +191,18 @@ public class ConsumerManager {
         return removed;
     }
 
+    private void clearTopicGroupTable(final ConsumerGroupInfo groupInfo) {
+        for (String subscribeTopic : groupInfo.getSubscribeTopics()) {
+            Set<String> groups = this.topicGroupTable.get(subscribeTopic);
+            if (groups != null) {
+                groups.remove(groupInfo.getGroupName());
+            }
+            if (groups != null && groups.isEmpty()) {
+                this.topicGroupTable.remove(subscribeTopic);
+            }
+        }
+    }
+
     // compensate consumer info for consumer without heartbeat
     public void compensateBasicConsumerInfo(String group, ConsumeType 
consumeType, MessageModel messageModel) {
         ConsumerGroupInfo consumerGroupInfo = 
consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new);
@@ -218,6 +234,16 @@ public class ConsumerManager {
             consumerGroupInfo = prev != null ? prev : tmp;
         }
 
+        for (SubscriptionData subscriptionData : subList) {
+            Set<String> groups = 
this.topicGroupTable.get(subscriptionData.getTopic());
+            if (groups == null) {
+                Set<String> tmp = new HashSet<>();
+                Set<String> prev = 
this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp);
+                groups = prev != null ? prev : tmp;
+            }
+            groups.add(subscriptionData.getTopic());
+        }
+
         boolean r1 =
             consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, 
messageModel,
                 consumeFromWhere);
@@ -258,6 +284,17 @@ public class ConsumerManager {
             ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, 
tmp);
             consumerGroupInfo = prev != null ? prev : tmp;
         }
+
+        for (SubscriptionData subscriptionData : 
consumerGroupInfo.getSubscriptionTable().values()) {
+            Set<String> groups = 
this.topicGroupTable.get(subscriptionData.getTopic());
+            if (groups == null) {
+                Set<String> tmp = new HashSet<>();
+                Set<String> prev = 
this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp);
+                groups = prev != null ? prev : tmp;
+            }
+            groups.add(subscriptionData.getTopic());
+        }
+
         boolean updateChannelRst = 
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, 
consumeFromWhere);
         if (updateChannelRst && isNotifyConsumerIdsChangedEnable && 
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
             callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, 
consumerGroupInfo.getAllChannel());
@@ -282,6 +319,7 @@ public class ConsumerManager {
                     LOGGER.info("unregister consumer ok, no any connection, 
and remove consumer group, {}", group);
 
                     
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
+                    clearTopicGroupTable(remove);
                 }
             }
             if (isNotifyConsumerIdsChangedEnable && 
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
@@ -349,14 +387,8 @@ public class ConsumerManager {
 
     public HashSet<String> queryTopicConsumeByWho(final String topic) {
         HashSet<String> groups = new HashSet<>();
-        Iterator<Entry<String, ConsumerGroupInfo>> it = 
this.consumerTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConsumerGroupInfo> entry = it.next();
-            ConcurrentMap<String, SubscriptionData> subscriptionTable =
-                entry.getValue().getSubscriptionTable();
-            if (subscriptionTable.containsKey(topic)) {
-                groups.add(entry.getKey());
-            }
+        if (this.topicGroupTable.get(topic) != null) {
+            groups.addAll(this.topicGroupTable.get(topic));
         }
         return groups;
     }

Reply via email to