This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d5b474d9d3 [ISSUE #9297] Supports outputting topic put TPS in TopicStatusSubCommand (#9298) d5b474d9d3 is described below commit d5b474d9d326f380d736abe700f8a7a37fe364e2 Author: ymwneu <ymw...@126.com> AuthorDate: Wed Apr 2 18:59:45 2025 +0800 [ISSUE #9297] Supports outputting topic put TPS in TopicStatusSubCommand (#9298) --- .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 1 + .../rocketmq/remoting/protocol/admin/TopicStatsTable.java | 10 ++++++++++ .../org/apache/rocketmq/store/stats/BrokerStatsManager.java | 4 ++++ .../org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java | 1 + .../rocketmq/tools/command/topic/TopicStatusSubCommand.java | 2 ++ 5 files changed, 18 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4ff4bed814..4200f34bde 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1837,6 +1837,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { topicStatsTable.getOffsetTable().put(mq, topicOffset); } + topicStatsTable.setTopicPutTps(this.brokerController.getBrokerStatsManager().tpsTopicPutNums(requestHeader.getTopic())); byte[] body = topicStatsTable.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java index 9f467e7449..5cb2af6373 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java @@ -22,6 +22,8 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class TopicStatsTable extends RemotingSerializable { + private double topicPutTps; + private Map<MessageQueue, TopicOffset> offsetTable = new ConcurrentHashMap<>(); public Map<MessageQueue, TopicOffset> getOffsetTable() { @@ -31,4 +33,12 @@ public class TopicStatsTable extends RemotingSerializable { public void setOffsetTable(Map<MessageQueue, TopicOffset> offsetTable) { this.offsetTable = offsetTable; } + + public double getTopicPutTps() { + return topicPutTps; + } + + public void setTopicPutTps(double topicPutTps) { + this.topicPutTps = topicPutTps; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 530339c23b..c272a30234 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -595,6 +595,10 @@ public class BrokerStatsManager { this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1); } + public double tpsTopicPutNums(final String topic) { + return this.statsTable.get(TOPIC_PUT_NUMS).getStatsDataInMinute(topic).getTps(); + } + public double tpsGroupGetNums(final String group, final String topic) { final String statsKey = buildStatsKey(topic, group); return this.statsTable.get(Stats.GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 9afd37f784..e6405cb2d9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -367,6 +367,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { if (addr != null) { TopicStatsTable tst = mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis); topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable()); + topicStatsTable.setTopicPutTps(topicStatsTable.getTopicPutTps() + tst.getTopicPutTps()); } } catch (Exception e) { logger.error("getTopicStatsInfo error. topic={}", topic, e); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java index 47ca761d1f..ff91399f1c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java @@ -113,6 +113,8 @@ public class TopicStatusSubCommand implements SubCommand { humanTimestamp ); } + System.out.printf("%n"); + System.out.printf("Topic Put TPS: %s%n", topicStatsTable.getTopicPutTps()); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally {