This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 7ef36102ea [ISSUE #9351] Add topic-group mapping in queryTopicConsumeByWho command (#9352) 7ef36102ea is described below commit 7ef36102eae5c1248354942a509d39c7a4942006 Author: yangguodong <1174533...@qq.com> AuthorDate: Wed Apr 23 13:51:01 2025 +0800 [ISSUE #9351] Add topic-group mapping in queryTopicConsumeByWho command (#9352) --- .../rocketmq/broker/client/ConsumerManager.java | 48 ++++++++++++++++++---- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index c658b128eb..04238e2c30 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -42,6 +42,8 @@ public class ConsumerManager { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<>(1024); + private final ConcurrentMap<String, Set<String>> topicGroupTable = + new ConcurrentHashMap<>(1024); private final ConcurrentMap<String, ConsumerGroupInfo> consumerCompensationTable = new ConcurrentHashMap<>(1024); private final List<ConsumerIdsChangeListener> consumerIdsChangeListenerList = new CopyOnWriteArrayList<>(); @@ -156,6 +158,7 @@ public class ConsumerManager { LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group); + clearTopicGroupTable(remove); } } callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); @@ -177,6 +180,7 @@ public class ConsumerManager { LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", next.getKey()); callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey()); + clearTopicGroupTable(remove); } } if (!isBroadcastMode(info.getMessageModel())) { @@ -187,6 +191,18 @@ public class ConsumerManager { return removed; } + private void clearTopicGroupTable(final ConsumerGroupInfo groupInfo) { + for (String subscribeTopic : groupInfo.getSubscribeTopics()) { + Set<String> groups = this.topicGroupTable.get(subscribeTopic); + if (groups != null) { + groups.remove(groupInfo.getGroupName()); + } + if (groups != null && groups.isEmpty()) { + this.topicGroupTable.remove(subscribeTopic); + } + } + } + // compensate consumer info for consumer without heartbeat public void compensateBasicConsumerInfo(String group, ConsumeType consumeType, MessageModel messageModel) { ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new); @@ -218,6 +234,16 @@ public class ConsumerManager { consumerGroupInfo = prev != null ? prev : tmp; } + for (SubscriptionData subscriptionData : subList) { + Set<String> groups = this.topicGroupTable.get(subscriptionData.getTopic()); + if (groups == null) { + Set<String> tmp = new HashSet<>(); + Set<String> prev = this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp); + groups = prev != null ? prev : tmp; + } + groups.add(subscriptionData.getTopic()); + } + boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); @@ -258,6 +284,17 @@ public class ConsumerManager { ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } + + for (SubscriptionData subscriptionData : consumerGroupInfo.getSubscriptionTable().values()) { + Set<String> groups = this.topicGroupTable.get(subscriptionData.getTopic()); + if (groups == null) { + Set<String> tmp = new HashSet<>(); + Set<String> prev = this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp); + groups = prev != null ? prev : tmp; + } + groups.add(subscriptionData.getTopic()); + } + boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); @@ -282,6 +319,7 @@ public class ConsumerManager { LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group); + clearTopicGroupTable(remove); } } if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { @@ -349,14 +387,8 @@ public class ConsumerManager { public HashSet<String> queryTopicConsumeByWho(final String topic) { HashSet<String> groups = new HashSet<>(); - Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConsumerGroupInfo> entry = it.next(); - ConcurrentMap<String, SubscriptionData> subscriptionTable = - entry.getValue().getSubscriptionTable(); - if (subscriptionTable.containsKey(topic)) { - groups.add(entry.getKey()); - } + if (this.topicGroupTable.get(topic) != null) { + groups.addAll(this.topicGroupTable.get(topic)); } return groups; }