This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5b474d9d3 [ISSUE #9297] Supports outputting topic put TPS in 
TopicStatusSubCommand (#9298)
d5b474d9d3 is described below

commit d5b474d9d326f380d736abe700f8a7a37fe364e2
Author: ymwneu <ymw...@126.com>
AuthorDate: Wed Apr 2 18:59:45 2025 +0800

    [ISSUE #9297] Supports outputting topic put TPS in TopicStatusSubCommand 
(#9298)
---
 .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java |  1 +
 .../rocketmq/remoting/protocol/admin/TopicStatsTable.java      | 10 ++++++++++
 .../org/apache/rocketmq/store/stats/BrokerStatsManager.java    |  4 ++++
 .../org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java |  1 +
 .../rocketmq/tools/command/topic/TopicStatusSubCommand.java    |  2 ++
 5 files changed, 18 insertions(+)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4ff4bed814..4200f34bde 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1837,6 +1837,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                 topicStatsTable.getOffsetTable().put(mq, topicOffset);
             }
 
+            
topicStatsTable.setTopicPutTps(this.brokerController.getBrokerStatsManager().tpsTopicPutNums(requestHeader.getTopic()));
             byte[] body = topicStatsTable.encode();
             response.setBody(body);
             response.setCode(ResponseCode.SUCCESS);
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
index 9f467e7449..5cb2af6373 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
@@ -22,6 +22,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicStatsTable extends RemotingSerializable {
+    private double topicPutTps;
+
     private Map<MessageQueue, TopicOffset> offsetTable = new 
ConcurrentHashMap<>();
 
     public Map<MessageQueue, TopicOffset> getOffsetTable() {
@@ -31,4 +33,12 @@ public class TopicStatsTable extends RemotingSerializable {
     public void setOffsetTable(Map<MessageQueue, TopicOffset> offsetTable) {
         this.offsetTable = offsetTable;
     }
+
+    public double getTopicPutTps() {
+        return topicPutTps;
+    }
+
+    public void setTopicPutTps(double topicPutTps) {
+        this.topicPutTps = topicPutTps;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 530339c23b..c272a30234 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -595,6 +595,10 @@ public class BrokerStatsManager {
         this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
     }
 
+    public double tpsTopicPutNums(final String topic) {
+        return 
this.statsTable.get(TOPIC_PUT_NUMS).getStatsDataInMinute(topic).getTps();
+    }
+
     public double tpsGroupGetNums(final String group, final String topic) {
         final String statsKey = buildStatsKey(topic, group);
         return 
this.statsTable.get(Stats.GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 9afd37f784..e6405cb2d9 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -367,6 +367,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                                 if (addr != null) {
                                     TopicStatsTable tst = 
mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, 
timeoutMillis);
                                     
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+                                    
topicStatsTable.setTopicPutTps(topicStatsTable.getTopicPutTps() + 
tst.getTopicPutTps());
                                 }
                             } catch (Exception e) {
                                 logger.error("getTopicStatsInfo error. 
topic={}", topic, e);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
index 47ca761d1f..ff91399f1c 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
@@ -113,6 +113,8 @@ public class TopicStatusSubCommand implements SubCommand {
                     humanTimestamp
                 );
             }
+            System.out.printf("%n");
+            System.out.printf("Topic Put TPS: %s%n", 
topicStatsTable.getTopicPutTps());
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
         } finally {

Reply via email to