This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 895489ddb6 [ISSUE #9282] Optimize BrokerController#printWaterMark (#9283) 895489ddb6 is described below commit 895489ddb6dcca74229bdf3f44af937e13f1597f Author: yx9o <yangx_s...@163.com> AuthorDate: Mon Mar 31 10:49:24 2025 +0800 [ISSUE #9282] Optimize BrokerController#printWaterMark (#9283) --- .../apache/rocketmq/broker/BrokerController.java | 39 +++++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index a715ec3a4e..c6163499a9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; @@ -1284,16 +1285,36 @@ public class BrokerController { return this.headSlowTimeMills(this.ackThreadPoolQueue); } + public long headSlowTimeMills4EndTransactionThreadPoolQueue() { + return this.headSlowTimeMills(this.endTransactionThreadPoolQueue); + } + + public long headSlowTimeMills4ClientManagerThreadPoolQueue() { + return this.headSlowTimeMills(this.clientManagerThreadPoolQueue); + } + + public long headSlowTimeMills4HeartbeatThreadPoolQueue() { + return this.headSlowTimeMills(this.heartbeatThreadPoolQueue); + } + + public long headSlowTimeMills4AdminBrokerThreadPoolQueue() { + return this.headSlowTimeMills(this.adminBrokerThreadPoolQueue); + } + public void printWaterMark() { - LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); - LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); - LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue()); - LOG_WATER_MARK.info("[WATERMARK] Lite Pull Queue Size: {} SlowTimeMills: {}", this.litePullThreadPoolQueue.size(), headSlowTimeMills4LitePullThreadPoolQueue()); - LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills(this.endTransactionThreadPoolQueue)); - LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), this.headSlowTimeMills(this.clientManagerThreadPoolQueue)); - LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), this.headSlowTimeMills(this.heartbeatThreadPoolQueue)); - LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: {}", this.ackThreadPoolQueue.size(), headSlowTimeMills(this.ackThreadPoolQueue)); - LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: {}", this.adminBrokerThreadPoolQueue.size(), headSlowTimeMills(this.adminBrokerThreadPoolQueue)); + logWaterMarkQueueInfo("Send", this.sendThreadPoolQueue, this::headSlowTimeMills4SendThreadPoolQueue); + logWaterMarkQueueInfo("Pull", this.pullThreadPoolQueue, this::headSlowTimeMills4PullThreadPoolQueue); + logWaterMarkQueueInfo("Query", this.queryThreadPoolQueue, this::headSlowTimeMills4QueryThreadPoolQueue); + logWaterMarkQueueInfo("Lite Pull", this.litePullThreadPoolQueue, this::headSlowTimeMills4LitePullThreadPoolQueue); + logWaterMarkQueueInfo("Transaction", this.endTransactionThreadPoolQueue, this::headSlowTimeMills4EndTransactionThreadPoolQueue); + logWaterMarkQueueInfo("ClientManager", this.clientManagerThreadPoolQueue, this::headSlowTimeMills4ClientManagerThreadPoolQueue); + logWaterMarkQueueInfo("Heartbeat", this.heartbeatThreadPoolQueue, this::headSlowTimeMills4HeartbeatThreadPoolQueue); + logWaterMarkQueueInfo("Ack", this.ackThreadPoolQueue, this::headSlowTimeMills4AckThreadPoolQueue); + logWaterMarkQueueInfo("Admin", this.adminBrokerThreadPoolQueue, this::headSlowTimeMills4AdminBrokerThreadPoolQueue); + } + + private void logWaterMarkQueueInfo(String queueName, BlockingQueue<?> queue, Supplier<Long> slowTimeSupplier) { + LOG_WATER_MARK.info("[WATERMARK] {} Queue Size: {} SlowTimeMills: {}", queueName, queue.size(), slowTimeSupplier.get()); } public MessageStore getMessageStore() {