This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 715dd5a885 Adding the EnableLmqStats option allows monitoring of LMQ 
statistics at runtime (#8973)
715dd5a885 is described below

commit 715dd5a885ae89ebc05aea33971029d7306c80ae
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Fri Nov 22 12:57:38 2024 +0800

    Adding the EnableLmqStats option allows monitoring of LMQ statistics at 
runtime (#8973)
---
 .../apache/rocketmq/broker/BrokerController.java   |   2 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  11 ++
 .../store/stats/LmqBrokerStatsManager.java         | 117 ++++++++++++---------
 3 files changed, 81 insertions(+), 49 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 143922e456..b907489bbf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -341,7 +341,7 @@ public class BrokerController {
         this.messageStoreConfig = messageStoreConfig;
         this.authConfig = authConfig;
         this.setStoreHost(new 
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
-        this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new 
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat()) : new 
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat());
+        this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new 
LmqBrokerStatsManager(this.brokerConfig) : new 
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat());
         this.broadcastOffsetManager = new BroadcastOffsetManager(this);
         if 
(ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion()))
 {
             this.configStorage = new 
ConfigStorage(messageStoreConfig.getStorePathRootDir());
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f459abf0db..9d8d913521 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -435,6 +435,9 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean appendCkAsync = false;
 
+
+    private boolean enableLmqStats = false;
+
     /**
      * V2 is recommended in cases where LMQ feature is extensively used.
      */
@@ -1905,6 +1908,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.appendCkAsync = appendCkAsync;
     }
 
+    public boolean isEnableLmqStats() {
+        return enableLmqStats;
+    }
+
+    public void setEnableLmqStats(boolean enableLmqStats) {
+        this.enableLmqStats = enableLmqStats;
+    }
+
     public String getConfigManagerVersion() {
         return configManagerVersion;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
index b17fcbc9ca..20ed879331 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
@@ -16,23 +16,29 @@
  */
 package org.apache.rocketmq.store.stats;
 
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 
 public class LmqBrokerStatsManager extends BrokerStatsManager {
 
-    public LmqBrokerStatsManager(String clusterName, boolean enableQueueStat) {
-        super(clusterName, enableQueueStat);
+    private final BrokerConfig brokerConfig;
+
+    public LmqBrokerStatsManager(BrokerConfig brokerConfig) {
+        super(brokerConfig.getBrokerClusterName(), 
brokerConfig.isEnableDetailStat());
+        this.brokerConfig = brokerConfig;
     }
 
     @Override
     public void incGroupGetNums(final String group, final String topic, final 
int incValue) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.incGroupGetNums(lmqGroup, lmqTopic, incValue);
     }
@@ -41,25 +47,28 @@ public class LmqBrokerStatsManager extends 
BrokerStatsManager {
     public void incGroupGetSize(final String group, final String topic, final 
int incValue) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.incGroupGetSize(lmqGroup, lmqTopic, incValue);
     }
 
-
     @Override
     public void incGroupAckNums(final String group, final String topic, final 
int incValue) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.incGroupAckNums(lmqGroup, lmqTopic, incValue);
     }
@@ -68,11 +77,13 @@ public class LmqBrokerStatsManager extends 
BrokerStatsManager {
     public void incGroupCkNums(final String group, final String topic, final 
int incValue) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.incGroupCkNums(lmqGroup, lmqTopic, incValue);
     }
@@ -81,11 +92,13 @@ public class LmqBrokerStatsManager extends 
BrokerStatsManager {
     public void incGroupGetLatency(final String group, final String topic, 
final int queueId, final int incValue) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.incGroupGetLatency(lmqGroup, lmqTopic, queueId, incValue);
     }
@@ -94,11 +107,13 @@ public class LmqBrokerStatsManager extends 
BrokerStatsManager {
     public void incSendBackNums(final String group, final String topic) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.incSendBackNums(lmqGroup, lmqTopic);
     }
@@ -107,11 +122,13 @@ public class LmqBrokerStatsManager extends 
BrokerStatsManager {
     public double tpsGroupGetNums(final String group, final String topic) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         return super.tpsGroupGetNums(lmqGroup, lmqTopic);
     }
@@ -121,11 +138,13 @@ public class LmqBrokerStatsManager extends 
BrokerStatsManager {
         final long fallBehind) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.recordDiskFallBehindTime(lmqGroup, lmqTopic, queueId, 
fallBehind);
     }
@@ -135,11 +154,13 @@ public class LmqBrokerStatsManager extends 
BrokerStatsManager {
         final long fallBehind) {
         String lmqGroup = group;
         String lmqTopic = topic;
-        if (MixAll.isLmq(group)) {
-            lmqGroup = MixAll.LMQ_PREFIX;
-        }
-        if (MixAll.isLmq(topic)) {
-            lmqTopic = MixAll.LMQ_PREFIX;
+        if (!brokerConfig.isEnableLmqStats()) {
+            if (MixAll.isLmq(group)) {
+                lmqGroup = MixAll.LMQ_PREFIX;
+            }
+            if (MixAll.isLmq(topic)) {
+                lmqTopic = MixAll.LMQ_PREFIX;
+            }
         }
         super.recordDiskFallBehindSize(lmqGroup, lmqTopic, queueId, 
fallBehind);
     }

Reply via email to