This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 8086fc545d [ISSUE #9152] Broker getConsumeStats supports inputting multiple topics (#9153) 8086fc545d is described below commit 8086fc545d1e403c52d770a15e7be7c247e849b0 Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Wed Feb 5 14:41:53 2025 +0800 [ISSUE #9152] Broker getConsumeStats supports inputting multiple topics (#9153) * [ISSUE #9152] The getConsumeStats supports inputting multiple topics --- .../broker/processor/AdminBrokerProcessor.java | 60 ++++++---- .../rocketmq/client/impl/MQClientAPIImpl.java | 15 ++- .../ExportRocksDBConfigToJsonRequestHeader.java | 3 +- .../header/GetConsumeStatsRequestHeader.java | 37 ++++++- .../header/GetConsumeStatsRequestHeaderTest.java | 123 +++++++++++++++++++++ .../tools/admin/DefaultMQAdminExtTest.java | 2 +- 6 files changed, 213 insertions(+), 27 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a9b913192f..2247e90f56 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1947,16 +1947,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final RemotingCommand response = RemotingCommand.createResponseCommand(null); try { final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); - ConsumeStats consumeStats = new ConsumeStats(); + List<String> topicListProvided = requestHeader.fetchTopicList(); + String topicProvided = requestHeader.getTopic(); + String group = requestHeader.getConsumerGroup(); - Set<String> topics = new HashSet<>(); - if (UtilAll.isBlank(requestHeader.getTopic())) { - topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup()); - } else { - topics.add(requestHeader.getTopic()); - } + ConsumeStats consumeStats = new ConsumeStats(); + Set<String> topicsForCollecting = getTopicsForCollectingConsumeStats(topicListProvided, topicProvided, group); - for (String topic : topics) { + for (String topic : topicsForCollecting) { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic); @@ -1964,20 +1962,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); - - { - SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); - - if (null == findSubscriptionData - && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { - LOGGER.warn( - "AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, " - + "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup()); - continue; - } - } - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setTopic(topic); @@ -2038,6 +2022,38 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } + private Set<String> getTopicsForCollectingConsumeStats(List<String> topicListProvided, String topicProvided, + String group) { + Set<String> topicsForCollecting = new HashSet<>(); + if (!topicListProvided.isEmpty()) { + // if topic list is provided, only collect the topics in the list + // and ignore subscription check + topicsForCollecting.addAll(topicListProvided); + } else { + // In order to be compatible with the old logic, + // even if the topic has been provided here, the subscription will be checked. + if (UtilAll.isBlank(topicProvided)) { + topicsForCollecting.addAll( + this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group)); + } else { + topicsForCollecting.add(topicProvided); + } + int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group); + Iterator<String> iterator = topicsForCollecting.iterator(); + while (iterator.hasNext()) { + String topic = iterator.next(); + SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); + if (findSubscriptionData == null && subscriptionCount > 0) { + LOGGER.warn( + "AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}", + topic, group); + iterator.remove(); + } + } + } + return topicsForCollecting; + } + private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 114093e350..bed6c1c476 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1748,16 +1748,27 @@ public class MQClientAPIImpl implements NameServerUpdateCallback, StartAndShutdo public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - return getConsumeStats(addr, consumerGroup, null, timeoutMillis); + return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis); + } + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List<String> topicList, + final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis); } public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, - final long timeoutMillis) + final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis); + } + + public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, + final List<String> topicList, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(topic); + requestHeader.updateTopicList(topicList); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java index 7b1f9470e1..8354f83053 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java @@ -21,12 +21,13 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.action.Action; import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.protocol.RequestCode; -@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET) +@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.GET) public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader { private static final String CONFIG_TYPE_SEPARATOR = ";"; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java index 51a46879e8..2c51c3f529 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java @@ -17,27 +17,62 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.action.Action; import org.apache.rocketmq.common.action.RocketMQAction; import org.apache.rocketmq.common.resource.ResourceType; import org.apache.rocketmq.common.resource.RocketMQResource; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; @RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET) public class GetConsumeStatsRequestHeader extends TopicRequestHeader { + private static final String TOPIC_NAME_SEPARATOR = ";"; + @CFNotNull @RocketMQResource(ResourceType.GROUP) private String consumerGroup; + @RocketMQResource(ResourceType.TOPIC) private String topic; + // if topicList is provided, topic will be ignored + @RocketMQResource(value = ResourceType.TOPIC, splitter = TOPIC_NAME_SEPARATOR) + private String topicList; + @Override public void checkFields() throws RemotingCommandException { } + public List<String> fetchTopicList() { + if (StringUtils.isBlank(topicList)) { + return Collections.emptyList(); + } + return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR)); + } + + public void updateTopicList(List<String> topicList) { + if (topicList == null || topicList.isEmpty()) { + return; + } + StringBuilder sb = new StringBuilder(); + topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR)); + this.setTopicList(sb.toString()); + } + + public String getTopicList() { + return topicList; + } + + public void setTopicList(String topicList) { + this.topicList = topicList; + } + public String getConsumerGroup() { return consumerGroup; } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java new file mode 100644 index 0000000000..8004305e17 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeaderTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class GetConsumeStatsRequestHeaderTest { + + private GetConsumeStatsRequestHeader header; + + @Before + public void setUp() { + header = new GetConsumeStatsRequestHeader(); + } + + @Test + public void updateTopicList_NullTopicList_DoesNotUpdate() { + header.updateTopicList(null); + assertNull(header.getTopicList()); + } + + @Test + public void updateTopicList_EmptyTopicList_SetsEmptyString() { + header.updateTopicList(Collections.emptyList()); + assertNull(header.getTopicList()); + } + + @Test + public void updateTopicList_SingleTopic_SetsSingleTopicString() { + List<String> topicList = Collections.singletonList("TopicA"); + header.updateTopicList(topicList); + assertEquals("TopicA;", header.getTopicList()); + } + + @Test + public void updateTopicList_MultipleTopics_SetsMultipleTopicsString() { + List<String> topicList = Arrays.asList("TopicA", "TopicB", "TopicC"); + header.updateTopicList(topicList); + assertEquals("TopicA;TopicB;TopicC;", header.getTopicList()); + } + + @Test + public void updateTopicList_RepeatedTopics_SetsRepeatedTopicsString() { + List<String> topicList = Arrays.asList("TopicA", "TopicA", "TopicB"); + header.updateTopicList(topicList); + assertEquals("TopicA;TopicA;TopicB;", header.getTopicList()); + } + + @Test + public void fetchTopicList_NullTopicList_ReturnsEmptyList() { + header.setTopicList(null); + List<String> topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + + header.updateTopicList(new ArrayList<>()); + topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + } + + @Test + public void fetchTopicList_EmptyTopicList_ReturnsEmptyList() { + header.setTopicList(""); + List<String> topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + } + + @Test + public void fetchTopicList_BlankTopicList_ReturnsEmptyList() { + header.setTopicList(" "); + List<String> topicList = header.fetchTopicList(); + assertEquals(Collections.emptyList(), topicList); + } + + @Test + public void fetchTopicList_SingleTopic_ReturnsSingleTopicList() { + header.setTopicList("TopicA"); + List<String> topicList = header.fetchTopicList(); + assertEquals(Collections.singletonList("TopicA"), topicList); + } + + @Test + public void fetchTopicList_MultipleTopics_ReturnsTopicList() { + header.setTopicList("TopicA;TopicB;TopicC"); + List<String> topicList = header.fetchTopicList(); + assertEquals(Arrays.asList("TopicA", "TopicB", "TopicC"), topicList); + } + + @Test + public void fetchTopicList_TopicListEndsWithSeparator_ReturnsTopicList() { + header.setTopicList("TopicA;TopicB;"); + List<String> topicList = header.fetchTopicList(); + assertEquals(Arrays.asList("TopicA", "TopicB"), topicList); + } + + @Test + public void fetchTopicList_TopicListStartsWithSeparator_ReturnsTopicList() { + header.setTopicList(";TopicA;TopicB"); + List<String> topicList = header.fetchTopicList(); + assertEquals(Arrays.asList("TopicA", "TopicB"), topicList); + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index dc5642f88c..ec5f7571d2 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -456,7 +456,7 @@ public class DefaultMQAdminExtTest { connection.setConnectionSet(connections); when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(connection); ConsumeStats consumeStats = new ConsumeStats(); - when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), isNull(), anyLong())).thenReturn(consumeStats); + when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats); List<MessageTrack> broadcastMessageTracks = defaultMQAdminExt.messageTrackDetail(messageExt); assertThat(broadcastMessageTracks.size()).isEqualTo(2); assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);