This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8086fc545d [ISSUE #9152] Broker getConsumeStats supports inputting 
multiple topics (#9153)
8086fc545d is described below

commit 8086fc545d1e403c52d770a15e7be7c247e849b0
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Wed Feb 5 14:41:53 2025 +0800

    [ISSUE #9152] Broker getConsumeStats supports inputting multiple topics 
(#9153)
    
    * [ISSUE #9152] The getConsumeStats supports inputting multiple topics
---
 .../broker/processor/AdminBrokerProcessor.java     |  60 ++++++----
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  15 ++-
 .../ExportRocksDBConfigToJsonRequestHeader.java    |   3 +-
 .../header/GetConsumeStatsRequestHeader.java       |  37 ++++++-
 .../header/GetConsumeStatsRequestHeaderTest.java   | 123 +++++++++++++++++++++
 .../tools/admin/DefaultMQAdminExtTest.java         |   2 +-
 6 files changed, 213 insertions(+), 27 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a9b913192f..2247e90f56 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1947,16 +1947,14 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         try {
             final GetConsumeStatsRequestHeader requestHeader = 
request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
-            ConsumeStats consumeStats = new ConsumeStats();
+            List<String> topicListProvided = requestHeader.fetchTopicList();
+            String topicProvided = requestHeader.getTopic();
+            String group = requestHeader.getConsumerGroup();
 
-            Set<String> topics = new HashSet<>();
-            if (UtilAll.isBlank(requestHeader.getTopic())) {
-                topics = 
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
-            } else {
-                topics.add(requestHeader.getTopic());
-            }
+            ConsumeStats consumeStats = new ConsumeStats();
+            Set<String> topicsForCollecting = 
getTopicsForCollectingConsumeStats(topicListProvided, topicProvided, group);
 
-            for (String topic : topics) {
+            for (String topic : topicsForCollecting) {
                 TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
                 if (null == topicConfig) {
                     LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic 
config does not exist, topic={}", topic);
@@ -1964,20 +1962,6 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                 }
 
                 TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
-
-                {
-                    SubscriptionData findSubscriptionData =
-                        
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(),
 topic);
-
-                    if (null == findSubscriptionData
-                        && 
this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup())
 > 0) {
-                        LOGGER.warn(
-                            "AdminBrokerProcessor#getConsumeStats: topic does 
not exist in consumer group's subscription, "
-                                + "topic={}, consumer group={}", topic, 
requestHeader.getConsumerGroup());
-                        continue;
-                    }
-                }
-
                 for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                     MessageQueue mq = new MessageQueue();
                     mq.setTopic(topic);
@@ -2038,6 +2022,38 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
+    private Set<String> getTopicsForCollectingConsumeStats(List<String> 
topicListProvided, String topicProvided,
+        String group) {
+        Set<String> topicsForCollecting = new HashSet<>();
+        if (!topicListProvided.isEmpty()) {
+            // if topic list is provided, only collect the topics in the list
+            // and ignore subscription check
+            topicsForCollecting.addAll(topicListProvided);
+        } else {
+            // In order to be compatible with the old logic,
+            // even if the topic has been provided here, the subscription will 
be checked.
+            if (UtilAll.isBlank(topicProvided)) {
+                topicsForCollecting.addAll(
+                    
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
+            } else {
+                topicsForCollecting.add(topicProvided);
+            }
+            int subscriptionCount = 
this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
+            Iterator<String> iterator = topicsForCollecting.iterator();
+            while (iterator.hasNext()) {
+                String topic = iterator.next();
+                SubscriptionData findSubscriptionData = 
this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
+                if (findSubscriptionData == null && subscriptionCount > 0) {
+                    LOGGER.warn(
+                        "AdminBrokerProcessor#getConsumeStats: topic does not 
exist in consumer group's subscription, topic={}, consumer group={}",
+                        topic, group);
+                    iterator.remove();
+                }
+            }
+        }
+        return topicsForCollecting;
+    }
+
     private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, 
RemotingCommand request) {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 114093e350..bed6c1c476 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1748,16 +1748,27 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback, StartAndShutdo
     public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final long timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
         MQBrokerException {
-        return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
+        return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
+    }
+
+    public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final List<String> topicList,
+        final long timeoutMillis) throws RemotingSendRequestException, 
RemotingConnectException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
+        return getConsumeStats(addr, consumerGroup, null, topicList, 
timeoutMillis);
     }
 
     public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final String topic,
-        final long timeoutMillis)
+        final long timeoutMillis) throws RemotingSendRequestException, 
RemotingConnectException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
+        return getConsumeStats(addr, consumerGroup, topic, null, 
timeoutMillis);
+    }
+
+    public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final String topic,
+        final List<String> topicList, final long timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
         MQBrokerException {
         GetConsumeStatsRequestHeader requestHeader = new 
GetConsumeStatsRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setTopic(topic);
+        requestHeader.updateTopicList(topicList);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, 
requestHeader);
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
index 7b1f9470e1..8354f83053 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java
@@ -21,12 +21,13 @@ import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.action.Action;
 import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.common.resource.ResourceType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 
-@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = 
Action.GET)
+@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = 
ResourceType.CLUSTER, action = Action.GET)
 public class ExportRocksDBConfigToJsonRequestHeader implements 
CommandCustomHeader {
     private static final String CONFIG_TYPE_SEPARATOR = ";";
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
index 51a46879e8..2c51c3f529 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java
@@ -17,27 +17,62 @@
 package org.apache.rocketmq.remoting.protocol.header;
 
 import com.google.common.base.MoreObjects;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.action.Action;
 import org.apache.rocketmq.common.action.RocketMQAction;
 import org.apache.rocketmq.common.resource.ResourceType;
 import org.apache.rocketmq.common.resource.RocketMQResource;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
 
 @RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
 public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
+    private static final String TOPIC_NAME_SEPARATOR = ";";
+
     @CFNotNull
     @RocketMQResource(ResourceType.GROUP)
     private String consumerGroup;
+
     @RocketMQResource(ResourceType.TOPIC)
     private String topic;
 
+    // if topicList is provided, topic will be ignored
+    @RocketMQResource(value = ResourceType.TOPIC, splitter = 
TOPIC_NAME_SEPARATOR)
+    private String topicList;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
 
+    public List<String> fetchTopicList() {
+        if (StringUtils.isBlank(topicList)) {
+            return Collections.emptyList();
+        }
+        return Arrays.asList(StringUtils.split(topicList, 
TOPIC_NAME_SEPARATOR));
+    }
+
+    public void updateTopicList(List<String> topicList) {
+        if (topicList == null || topicList.isEmpty()) {
+            return;
+        }
+        StringBuilder sb = new StringBuilder();
+        topicList.forEach(topic -> 
sb.append(topic).append(TOPIC_NAME_SEPARATOR));
+        this.setTopicList(sb.toString());
+    }
+
+    public String getTopicList() {
+        return topicList;
+    }
+
+    public void setTopicList(String topicList) {
+        this.topicList = topicList;
+    }
+
     public String getConsumerGroup() {
         return consumerGroup;
     }
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java
new file mode 100644
index 0000000000..8004305e17
--- /dev/null
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class GetConsumeStatsRequestHeaderTest {
+
+    private GetConsumeStatsRequestHeader header;
+
+    @Before
+    public void setUp() {
+        header = new GetConsumeStatsRequestHeader();
+    }
+
+    @Test
+    public void updateTopicList_NullTopicList_DoesNotUpdate() {
+        header.updateTopicList(null);
+        assertNull(header.getTopicList());
+    }
+
+    @Test
+    public void updateTopicList_EmptyTopicList_SetsEmptyString() {
+        header.updateTopicList(Collections.emptyList());
+        assertNull(header.getTopicList());
+    }
+
+    @Test
+    public void updateTopicList_SingleTopic_SetsSingleTopicString() {
+        List<String> topicList = Collections.singletonList("TopicA");
+        header.updateTopicList(topicList);
+        assertEquals("TopicA;", header.getTopicList());
+    }
+
+    @Test
+    public void updateTopicList_MultipleTopics_SetsMultipleTopicsString() {
+        List<String> topicList = Arrays.asList("TopicA", "TopicB", "TopicC");
+        header.updateTopicList(topicList);
+        assertEquals("TopicA;TopicB;TopicC;", header.getTopicList());
+    }
+
+    @Test
+    public void updateTopicList_RepeatedTopics_SetsRepeatedTopicsString() {
+        List<String> topicList = Arrays.asList("TopicA", "TopicA", "TopicB");
+        header.updateTopicList(topicList);
+        assertEquals("TopicA;TopicA;TopicB;", header.getTopicList());
+    }
+
+    @Test
+    public void fetchTopicList_NullTopicList_ReturnsEmptyList() {
+        header.setTopicList(null);
+        List<String> topicList = header.fetchTopicList();
+        assertEquals(Collections.emptyList(), topicList);
+
+        header.updateTopicList(new ArrayList<>());
+        topicList = header.fetchTopicList();
+        assertEquals(Collections.emptyList(), topicList);
+    }
+
+    @Test
+    public void fetchTopicList_EmptyTopicList_ReturnsEmptyList() {
+        header.setTopicList("");
+        List<String> topicList = header.fetchTopicList();
+        assertEquals(Collections.emptyList(), topicList);
+    }
+
+    @Test
+    public void fetchTopicList_BlankTopicList_ReturnsEmptyList() {
+        header.setTopicList("   ");
+        List<String> topicList = header.fetchTopicList();
+        assertEquals(Collections.emptyList(), topicList);
+    }
+
+    @Test
+    public void fetchTopicList_SingleTopic_ReturnsSingleTopicList() {
+        header.setTopicList("TopicA");
+        List<String> topicList = header.fetchTopicList();
+        assertEquals(Collections.singletonList("TopicA"), topicList);
+    }
+
+    @Test
+    public void fetchTopicList_MultipleTopics_ReturnsTopicList() {
+        header.setTopicList("TopicA;TopicB;TopicC");
+        List<String> topicList = header.fetchTopicList();
+        assertEquals(Arrays.asList("TopicA", "TopicB", "TopicC"), topicList);
+    }
+
+    @Test
+    public void fetchTopicList_TopicListEndsWithSeparator_ReturnsTopicList() {
+        header.setTopicList("TopicA;TopicB;");
+        List<String> topicList = header.fetchTopicList();
+        assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
+    }
+
+    @Test
+    public void fetchTopicList_TopicListStartsWithSeparator_ReturnsTopicList() 
{
+        header.setTopicList(";TopicA;TopicB");
+        List<String> topicList = header.fetchTopicList();
+        assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index dc5642f88c..ec5f7571d2 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -456,7 +456,7 @@ public class DefaultMQAdminExtTest {
         connection.setConnectionSet(connections);
         when(mQClientAPIImpl.getConsumerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(connection);
         ConsumeStats consumeStats = new ConsumeStats();
-        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), 
isNull(), anyLong())).thenReturn(consumeStats);
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), 
(String) isNull(), anyLong())).thenReturn(consumeStats);
         List<MessageTrack> broadcastMessageTracks = 
defaultMQAdminExt.messageTrackDetail(messageExt);
         assertThat(broadcastMessageTracks.size()).isEqualTo(2);
         
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);

Reply via email to