This is an automated email from the ASF dual-hosted git repository. ltamber pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 19f799d936 [ISSUE #8340] RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills (#8339) 19f799d936 is described below commit 19f799d93617c17b093c9e372c5141f3ce32b292 Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Wed Feb 26 15:29:55 2025 +0800 [ISSUE #8340] RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills (#8339) * RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills * RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills --- .../org/apache/rocketmq/broker/BrokerController.java | 4 ++++ .../rocketmq/broker/processor/AdminBrokerProcessor.java | 5 +++++ .../tools/command/cluster/ClusterListSubCommand.java | 16 +++++++++------- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 4031dce8d6..a715ec3a4e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1280,6 +1280,10 @@ public class BrokerController { return this.headSlowTimeMills(this.queryThreadPoolQueue); } + public long headSlowTimeMills4AckThreadPoolQueue() { + return this.headSlowTimeMills(this.ackThreadPoolQueue); + } + public void printWaterMark() { LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 2247e90f56..4ff4bed814 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2801,10 +2801,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { runtimeInfo.put("queryThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); + runtimeInfo.put("ackThreadPoolQueueSize", String.valueOf(this.brokerController.getAckThreadPoolQueue().size())); + runtimeInfo.put("ackThreadPoolQueueCapacity", + String.valueOf(this.brokerController.getBrokerConfig().getAckThreadPoolQueueCapacity())); + runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(brokerController.headSlowTimeMills4PullThreadPoolQueue())); runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue())); runtimeInfo.put("litePullThreadPoolQueueHeadWaitTimeMills", String.valueOf(brokerController.headSlowTimeMills4LitePullThreadPoolQueue())); + runtimeInfo.put("ackThreadPoolQueueHeadWaitTimeMills", String.valueOf(brokerController.headSlowTimeMills4AckThreadPoolQueue())); runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size())); runtimeInfo.put("EndTransactionThreadPoolQueueCapacity", diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java index ede0fa5cf4..8103f4c7f8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -177,9 +177,9 @@ public class ClusterListSubCommand implements SubCommand { } private void printClusterBaseInfo(final Set<String> clusterNames, - final DefaultMQAdminExt defaultMQAdminExt, - final ClusterInfo clusterInfo) { - System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %16s %-22s %-11s %-12s %-8s %-10s%n", + final DefaultMQAdminExt defaultMQAdminExt, + final ClusterInfo clusterInfo) { + System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %30s %-22s %-11s %-12s %-8s %-10s%n", "#Cluster Name", "#Broker Name", "#BID", @@ -212,8 +212,10 @@ public class ClusterListSubCommand implements SubCommand { String version = ""; String sendThreadPoolQueueSize = ""; String pullThreadPoolQueueSize = ""; + String ackThreadPoolQueueSize = ""; String sendThreadPoolQueueHeadWaitTimeMills = ""; String pullThreadPoolQueueHeadWaitTimeMills = ""; + String ackThreadPoolQueueHeadWaitTimeMills = ""; String pageCacheLockTimeMills = ""; String earliestMessageTimeStamp = ""; String commitLogDiskRatio = ""; @@ -228,14 +230,14 @@ public class ClusterListSubCommand implements SubCommand { isBrokerActive = Boolean.parseBoolean(kvTable.getTable().get("brokerActive")); String putTps = kvTable.getTable().get("putTps"); String getTransferredTps = kvTable.getTable().get("getTransferredTps"); - sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize"); - pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize"); sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize"); pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize"); + ackThreadPoolQueueSize = kvTable.getTable().getOrDefault("ackThreadPoolQueueSize", "N"); sendThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills"); pullThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills"); + ackThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().getOrDefault("ackThreadPoolQueueHeadWaitTimeMills", "N"); pageCacheLockTimeMills = kvTable.getTable().get("pageCacheLockTimeMills"); earliestMessageTimeStamp = kvTable.getTable().get("earliestMessageTimeStamp"); commitLogDiskRatio = kvTable.getTable().get("commitLogDiskRatio"); @@ -280,14 +282,14 @@ public class ClusterListSubCommand implements SubCommand { space = Double.parseDouble(commitLogDiskRatio); } - System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %16s %-22s %11s %-12s %-8s %10s%n", + System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %30s %-22s %11s %-12s %-8s %10s%n", clusterName, brokerName, next1.getKey(), next1.getValue(), version, String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), - String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), + String.format("%9.2f(%s,%sms|%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills, ackThreadPoolQueueSize, ackThreadPoolQueueHeadWaitTimeMills), String.format("%d-%d(%.1fw, %.1f, %.1f)", timerReadBehind, timerOffsetBehind, timerCongestNum / 10000.0f, timerEnqueueTps, timerDequeueTps), pageCacheLockTimeMills, String.format("%2.2f", hour),