This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 895489ddb6 [ISSUE #9282] Optimize BrokerController#printWaterMark 
(#9283)
895489ddb6 is described below

commit 895489ddb6dcca74229bdf3f44af937e13f1597f
Author: yx9o <yangx_s...@163.com>
AuthorDate: Mon Mar 31 10:49:24 2025 +0800

    [ISSUE #9282] Optimize BrokerController#printWaterMark (#9283)
---
 .../apache/rocketmq/broker/BrokerController.java   | 39 +++++++++++++++++-----
 1 file changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a715ec3a4e..c6163499a9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.acl.plain.PlainAccessValidator;
@@ -1284,16 +1285,36 @@ public class BrokerController {
         return this.headSlowTimeMills(this.ackThreadPoolQueue);
     }
 
+    public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
+        return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
+    }
+
+    public long headSlowTimeMills4ClientManagerThreadPoolQueue() {
+        return this.headSlowTimeMills(this.clientManagerThreadPoolQueue);
+    }
+
+    public long headSlowTimeMills4HeartbeatThreadPoolQueue() {
+        return this.headSlowTimeMills(this.heartbeatThreadPoolQueue);
+    }
+
+    public long headSlowTimeMills4AdminBrokerThreadPoolQueue() {
+        return this.headSlowTimeMills(this.adminBrokerThreadPoolQueue);
+    }
+
     public void printWaterMark() {
-        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: 
{}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
-        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: 
{}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
-        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: 
{}", this.queryThreadPoolQueue.size(), 
headSlowTimeMills4QueryThreadPoolQueue());
-        LOG_WATER_MARK.info("[WATERMARK] Lite Pull Queue Size: {} 
SlowTimeMills: {}", this.litePullThreadPoolQueue.size(), 
headSlowTimeMills4LitePullThreadPoolQueue());
-        LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} 
SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), 
headSlowTimeMills(this.endTransactionThreadPoolQueue));
-        LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} 
SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), 
this.headSlowTimeMills(this.clientManagerThreadPoolQueue));
-        LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} 
SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), 
this.headSlowTimeMills(this.heartbeatThreadPoolQueue));
-        LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: 
{}", this.ackThreadPoolQueue.size(), 
headSlowTimeMills(this.ackThreadPoolQueue));
-        LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: 
{}", this.adminBrokerThreadPoolQueue.size(), 
headSlowTimeMills(this.adminBrokerThreadPoolQueue));
+        logWaterMarkQueueInfo("Send", this.sendThreadPoolQueue, 
this::headSlowTimeMills4SendThreadPoolQueue);
+        logWaterMarkQueueInfo("Pull", this.pullThreadPoolQueue, 
this::headSlowTimeMills4PullThreadPoolQueue);
+        logWaterMarkQueueInfo("Query", this.queryThreadPoolQueue, 
this::headSlowTimeMills4QueryThreadPoolQueue);
+        logWaterMarkQueueInfo("Lite Pull", this.litePullThreadPoolQueue, 
this::headSlowTimeMills4LitePullThreadPoolQueue);
+        logWaterMarkQueueInfo("Transaction", 
this.endTransactionThreadPoolQueue, 
this::headSlowTimeMills4EndTransactionThreadPoolQueue);
+        logWaterMarkQueueInfo("ClientManager", 
this.clientManagerThreadPoolQueue, 
this::headSlowTimeMills4ClientManagerThreadPoolQueue);
+        logWaterMarkQueueInfo("Heartbeat", this.heartbeatThreadPoolQueue, 
this::headSlowTimeMills4HeartbeatThreadPoolQueue);
+        logWaterMarkQueueInfo("Ack", this.ackThreadPoolQueue, 
this::headSlowTimeMills4AckThreadPoolQueue);
+        logWaterMarkQueueInfo("Admin", this.adminBrokerThreadPoolQueue, 
this::headSlowTimeMills4AdminBrokerThreadPoolQueue);
+    }
+
+    private void logWaterMarkQueueInfo(String queueName, BlockingQueue<?> 
queue, Supplier<Long> slowTimeSupplier) {
+        LOG_WATER_MARK.info("[WATERMARK] {} Queue Size: {} SlowTimeMills: {}", 
queueName, queue.size(), slowTimeSupplier.get());
     }
 
     public MessageStore getMessageStore() {

Reply via email to