This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 715dd5a885 Adding the EnableLmqStats option allows monitoring of LMQ statistics at runtime (#8973) 715dd5a885 is described below commit 715dd5a885ae89ebc05aea33971029d7306c80ae Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Fri Nov 22 12:57:38 2024 +0800 Adding the EnableLmqStats option allows monitoring of LMQ statistics at runtime (#8973) --- .../apache/rocketmq/broker/BrokerController.java | 2 +- .../org/apache/rocketmq/common/BrokerConfig.java | 11 ++ .../store/stats/LmqBrokerStatsManager.java | 117 ++++++++++++--------- 3 files changed, 81 insertions(+), 49 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 143922e456..b907489bbf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -341,7 +341,7 @@ public class BrokerController { this.messageStoreConfig = messageStoreConfig; this.authConfig = authConfig; this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort())); - this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); + this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); this.broadcastOffsetManager = new BroadcastOffsetManager(this); if (ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion())) { this.configStorage = new ConfigStorage(messageStoreConfig.getStorePathRootDir()); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f459abf0db..9d8d913521 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -435,6 +435,9 @@ public class BrokerConfig extends BrokerIdentity { private boolean appendCkAsync = false; + + private boolean enableLmqStats = false; + /** * V2 is recommended in cases where LMQ feature is extensively used. */ @@ -1905,6 +1908,14 @@ public class BrokerConfig extends BrokerIdentity { this.appendCkAsync = appendCkAsync; } + public boolean isEnableLmqStats() { + return enableLmqStats; + } + + public void setEnableLmqStats(boolean enableLmqStats) { + this.enableLmqStats = enableLmqStats; + } + public String getConfigManagerVersion() { return configManagerVersion; } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java index b17fcbc9ca..20ed879331 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java @@ -16,23 +16,29 @@ */ package org.apache.rocketmq.store.stats; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; public class LmqBrokerStatsManager extends BrokerStatsManager { - public LmqBrokerStatsManager(String clusterName, boolean enableQueueStat) { - super(clusterName, enableQueueStat); + private final BrokerConfig brokerConfig; + + public LmqBrokerStatsManager(BrokerConfig brokerConfig) { + super(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()); + this.brokerConfig = brokerConfig; } @Override public void incGroupGetNums(final String group, final String topic, final int incValue) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.incGroupGetNums(lmqGroup, lmqTopic, incValue); } @@ -41,25 +47,28 @@ public class LmqBrokerStatsManager extends BrokerStatsManager { public void incGroupGetSize(final String group, final String topic, final int incValue) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.incGroupGetSize(lmqGroup, lmqTopic, incValue); } - @Override public void incGroupAckNums(final String group, final String topic, final int incValue) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.incGroupAckNums(lmqGroup, lmqTopic, incValue); } @@ -68,11 +77,13 @@ public class LmqBrokerStatsManager extends BrokerStatsManager { public void incGroupCkNums(final String group, final String topic, final int incValue) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.incGroupCkNums(lmqGroup, lmqTopic, incValue); } @@ -81,11 +92,13 @@ public class LmqBrokerStatsManager extends BrokerStatsManager { public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.incGroupGetLatency(lmqGroup, lmqTopic, queueId, incValue); } @@ -94,11 +107,13 @@ public class LmqBrokerStatsManager extends BrokerStatsManager { public void incSendBackNums(final String group, final String topic) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.incSendBackNums(lmqGroup, lmqTopic); } @@ -107,11 +122,13 @@ public class LmqBrokerStatsManager extends BrokerStatsManager { public double tpsGroupGetNums(final String group, final String topic) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } return super.tpsGroupGetNums(lmqGroup, lmqTopic); } @@ -121,11 +138,13 @@ public class LmqBrokerStatsManager extends BrokerStatsManager { final long fallBehind) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.recordDiskFallBehindTime(lmqGroup, lmqTopic, queueId, fallBehind); } @@ -135,11 +154,13 @@ public class LmqBrokerStatsManager extends BrokerStatsManager { final long fallBehind) { String lmqGroup = group; String lmqTopic = topic; - if (MixAll.isLmq(group)) { - lmqGroup = MixAll.LMQ_PREFIX; - } - if (MixAll.isLmq(topic)) { - lmqTopic = MixAll.LMQ_PREFIX; + if (!brokerConfig.isEnableLmqStats()) { + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } } super.recordDiskFallBehindSize(lmqGroup, lmqTopic, queueId, fallBehind); }