This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7967edfc5e [ISSUE #9677] Resolve metrics static variable conflicts in 
BrokerContainer mode (#9678)
7967edfc5e is described below

commit 7967edfc5e28041a6019d3b3ce353fcef0eb7d3f
Author: rongtong <[email protected]>
AuthorDate: Thu Sep 11 16:29:20 2025 +0800

    [ISSUE #9677] Resolve metrics static variable conflicts in BrokerContainer 
mode (#9678)
    
    * Cherry-pick partial changes from 67db1757df - BrokerMetricsManager 
refactoring
    
    - Excluded DefaultMappedFile, METRICS_REFACTORING_GUIDE.md, TimerMetrics, 
and RocksDB files per requirements
    - Successfully applied changes to most broker and store modules
    - Compilation errors in TimerMessageStore.java will be fixed in next commit
    
    * fix: Resolve metrics static variable conflicts in BrokerContainer mode
    
    Convert static metrics variables to instance-level to fix resource leaks and
    data conflicts in BrokerContainer scenarios with multiple broker instances.
    
    ## Problem Statement
    In BrokerContainer mode, multiple broker instances share static metrics 
variables
    from BrokerMetricsManager and DefaultStoreMetricsManager, causing:
    - Metrics data conflicts between different broker instances
    - Resource leaks during frequent addBroker/removeBroker operations
    - Incorrect metrics aggregation across multiple brokers
    
    ## Solution
    - Convert static metrics variables to instance-level variables
    - Add proper getter methods for external access
    - Ensure each broker instance maintains isolated metrics
    - Apply instanceof checks for type safety in TimerMessageStore
    
    ## Files Modified
    - 
broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
    - store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
    - METRICS_REFACTORING_GUIDE.md (documentation)
    
    ## Key Benefits
    ✅ Eliminates metrics conflicts between broker instances
    ✅ Prevents resource leaks in dynamic broker scenarios
    ✅ Maintains proper metrics isolation per broker
    ✅ Supports BrokerContainer mode with multiple brokers
    ✅ Backward compatible with existing functionality
    
    Resolves metrics static variable issues in multi-broker container 
environments.
    
    * Delete useless file
    
    * Fix test can not pass
    
    * Fix test can not pass
---
 .../apache/rocketmq/broker/BrokerController.java   |   4 +
 .../broker/metrics/BrokerMetricsManager.java       | 121 ++++++++++----
 .../processor/AbstractSendMessageProcessor.java    |   4 +-
 .../broker/processor/AdminBrokerProcessor.java     |  18 +-
 .../processor/DefaultPullMessageResultHandler.java |   6 +-
 .../broker/processor/EndTransactionProcessor.java  |   8 +-
 .../broker/processor/PeekMessageProcessor.java     |   8 +-
 .../broker/processor/PopMessageProcessor.java      |   8 +-
 .../broker/processor/PopReviveService.java         |   8 +-
 .../broker/processor/ReplyMessageProcessor.java    |   8 +-
 .../broker/processor/SendMessageProcessor.java     |  12 +-
 .../broker/schedule/ScheduleMessageService.java    |  16 +-
 .../queue/TransactionalMessageBridge.java          |   8 +-
 .../rocketmq/broker/BrokerControllerTest.java      |   9 +
 .../broker/metrics/BrokerMetricsManagerTest.java   |  29 +++-
 .../broker/processor/AdminBrokerProcessorTest.java |   3 +
 .../processor/EndTransactionProcessorTest.java     |   3 +
 .../broker/processor/PeekMessageProcessorTest.java |   3 +
 .../broker/processor/PopMessageProcessorTest.java  |   3 +
 .../broker/processor/PullMessageProcessorTest.java |   3 +
 .../processor/ReplyMessageProcessorTest.java       |   3 +
 .../broker/processor/SendMessageProcessorTest.java |   3 +
 .../schedule/ScheduleMessageServiceTest.java       |  20 ++-
 .../proxy/metrics/ProxyMetricsManager.java         |   2 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  11 +-
 .../store/metrics/DefaultStoreMetricsManager.java  | 185 ++++++++++++++-------
 .../rocketmq/store/timer/TimerMessageStore.java    |  16 +-
 27 files changed, 365 insertions(+), 157 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 33331bbabb..0cdec87d5e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -500,6 +500,10 @@ public class BrokerController {
         return brokerMetricsManager;
     }
 
+    public void setBrokerMetricsManager(BrokerMetricsManager 
brokerMetricsManager) {
+        this.brokerMetricsManager = brokerMetricsManager;
+    }
+
     protected void initializeRemotingServer() throws 
CloneNotSupportedException {
         RemotingServer tcpRemotingServer = new 
NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
         NettyServerConfig fastConfig = (NettyServerConfig) 
this.nettyServerConfig.clone();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 6300d763d6..5a32cf3c67 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -121,46 +121,45 @@ public class BrokerMetricsManager {
     private final MessageStore messageStore;
     private final BrokerController brokerController;
     private final ConsumerLagCalculator consumerLagCalculator;
-    private final static Map<String, String> LABEL_MAP = new HashMap<>();
+    private final Map<String, String> labelMap = new HashMap<>();
     private OtlpGrpcMetricExporter metricExporter;
     private PeriodicMetricReader periodicMetricReader;
     private PrometheusHttpServer prometheusHttpServer;
     private MetricExporter loggingMetricExporter;
     private Meter brokerMeter;
 
-    public static Supplier<AttributesBuilder> attributesBuilderSupplier = 
Attributes::builder;
+    private Supplier<AttributesBuilder> attributesBuilderSupplier = 
Attributes::builder;
 
     // broker stats metrics
-    public static ObservableLongGauge processorWatermark = new 
NopObservableLongGauge();
-    public static ObservableLongGauge brokerPermission = new 
NopObservableLongGauge();
-    public static ObservableLongGauge topicNum = new NopObservableLongGauge();
-    public static ObservableLongGauge consumerGroupNum = new 
NopObservableLongGauge();
-
+    private ObservableLongGauge processorWatermark = new 
NopObservableLongGauge();
+    private ObservableLongGauge brokerPermission = new 
NopObservableLongGauge();
+    private ObservableLongGauge topicNum = new NopObservableLongGauge();
+    private ObservableLongGauge consumerGroupNum = new 
NopObservableLongGauge();
 
     // request metrics
-    public static LongCounter messagesInTotal = new NopLongCounter();
-    public static LongCounter messagesOutTotal = new NopLongCounter();
-    public static LongCounter throughputInTotal = new NopLongCounter();
-    public static LongCounter throughputOutTotal = new NopLongCounter();
-    public static LongHistogram messageSize = new NopLongHistogram();
-    public static LongHistogram topicCreateExecuteTime = new 
NopLongHistogram();
-    public static LongHistogram consumerGroupCreateExecuteTime = new 
NopLongHistogram();
+    private LongCounter messagesInTotal = new NopLongCounter();
+    private LongCounter messagesOutTotal = new NopLongCounter();
+    private LongCounter throughputInTotal = new NopLongCounter();
+    private LongCounter throughputOutTotal = new NopLongCounter();
+    private LongHistogram messageSize = new NopLongHistogram();
+    private LongHistogram topicCreateExecuteTime = new NopLongHistogram();
+    private LongHistogram consumerGroupCreateExecuteTime = new 
NopLongHistogram();
 
     // client connection metrics
-    public static ObservableLongGauge producerConnection = new 
NopObservableLongGauge();
-    public static ObservableLongGauge consumerConnection = new 
NopObservableLongGauge();
+    private ObservableLongGauge producerConnection = new 
NopObservableLongGauge();
+    private ObservableLongGauge consumerConnection = new 
NopObservableLongGauge();
 
     // Lag metrics
-    public static ObservableLongGauge consumerLagMessages = new 
NopObservableLongGauge();
-    public static ObservableLongGauge consumerLagLatency = new 
NopObservableLongGauge();
-    public static ObservableLongGauge consumerInflightMessages = new 
NopObservableLongGauge();
-    public static ObservableLongGauge consumerQueueingLatency = new 
NopObservableLongGauge();
-    public static ObservableLongGauge consumerReadyMessages = new 
NopObservableLongGauge();
-    public static LongCounter sendToDlqMessages = new NopLongCounter();
-    public static ObservableLongGauge halfMessages = new 
NopObservableLongGauge();
-    public static LongCounter commitMessagesTotal = new NopLongCounter();
-    public static LongCounter rollBackMessagesTotal = new NopLongCounter();
-    public static LongHistogram transactionFinishLatency = new 
NopLongHistogram();
+    private ObservableLongGauge consumerLagMessages = new 
NopObservableLongGauge();
+    private ObservableLongGauge consumerLagLatency = new 
NopObservableLongGauge();
+    private ObservableLongGauge consumerInflightMessages = new 
NopObservableLongGauge();
+    private ObservableLongGauge consumerQueueingLatency = new 
NopObservableLongGauge();
+    private ObservableLongGauge consumerReadyMessages = new 
NopObservableLongGauge();
+    private LongCounter sendToDlqMessages = new NopLongCounter();
+    private ObservableLongGauge halfMessages = new NopObservableLongGauge();
+    private LongCounter commitMessagesTotal = new NopLongCounter();
+    private LongCounter rollBackMessagesTotal = new NopLongCounter();
+    private LongHistogram transactionFinishLatency = new NopLongHistogram();
 
     @SuppressWarnings("DoubleBraceInitialization")
     public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new 
ArrayList<String>() {
@@ -177,13 +176,13 @@ public class BrokerMetricsManager {
         init();
     }
 
-    public static AttributesBuilder newAttributesBuilder() {
+    public AttributesBuilder newAttributesBuilder() {
         AttributesBuilder attributesBuilder;
         if (attributesBuilderSupplier == null) {
             attributesBuilderSupplier = Attributes::builder;
         }
         attributesBuilder = attributesBuilderSupplier.get();
-        LABEL_MAP.forEach(attributesBuilder::put);
+        labelMap.forEach(attributesBuilder::put);
         return attributesBuilder;
     }
 
@@ -242,6 +241,56 @@ public class BrokerMetricsManager {
         return brokerMeter;
     }
 
+    // Getter methods for metrics variables
+    public LongCounter getMessagesInTotal() {
+        return messagesInTotal;
+    }
+
+    public LongCounter getMessagesOutTotal() {
+        return messagesOutTotal;
+    }
+
+    public LongCounter getThroughputInTotal() {
+        return throughputInTotal;
+    }
+
+    public LongCounter getThroughputOutTotal() {
+        return throughputOutTotal;
+    }
+
+    public LongHistogram getMessageSize() {
+        return messageSize;
+    }
+
+    public LongCounter getSendToDlqMessages() {
+        return sendToDlqMessages;
+    }
+
+    public LongCounter getCommitMessagesTotal() {
+        return commitMessagesTotal;
+    }
+
+    public LongCounter getRollBackMessagesTotal() {
+        return rollBackMessagesTotal;
+    }
+
+    public LongHistogram getTransactionFinishLatency() {
+        return transactionFinishLatency;
+    }
+
+    public LongHistogram getTopicCreateExecuteTime() {
+        return topicCreateExecuteTime;
+    }
+
+    public LongHistogram getConsumerGroupCreateExecuteTime() {
+        return consumerGroupCreateExecuteTime;
+    }
+
+    // Setter method for testing purposes
+    public void setAttributesBuilderSupplier(Supplier<AttributesBuilder> 
attributesBuilderSupplier) {
+        this.attributesBuilderSupplier = attributesBuilderSupplier;
+    }
+
     private boolean checkConfig() {
         if (brokerConfig == null) {
             return false;
@@ -282,15 +331,15 @@ public class BrokerMetricsManager {
                     LOGGER.warn("metricsLabel is not valid: {}", labels);
                     continue;
                 }
-                LABEL_MAP.put(split[0], split[1]);
+                labelMap.put(split[0], split[1]);
             }
         }
         if (brokerConfig.isMetricsInDelta()) {
-            LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
+            labelMap.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
         }
-        LABEL_MAP.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
-        LABEL_MAP.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
-        LABEL_MAP.put(LABEL_NODE_ID, brokerConfig.getBrokerName());
+        labelMap.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
+        labelMap.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
+        labelMap.put(LABEL_NODE_ID, brokerConfig.getBrokerName());
 
         SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
             .setResource(Resource.empty());
@@ -699,13 +748,13 @@ public class BrokerMetricsManager {
     }
     private void initOtherMetrics() {
         if (brokerConfig.isEnableRemotingMetrics()) {
-            RemotingMetricsManager.initMetrics(brokerMeter, 
BrokerMetricsManager::newAttributesBuilder);
+            RemotingMetricsManager.initMetrics(brokerMeter, 
this::newAttributesBuilder);
         }
         if (brokerConfig.isEnableMessageStoreMetrics()) {
-            messageStore.initMetrics(brokerMeter, 
BrokerMetricsManager::newAttributesBuilder);
+            messageStore.initMetrics(brokerMeter, this::newAttributesBuilder);
         }
         if (brokerConfig.isEnablePopMetrics()) {
-            PopMetricsManager.initMetrics(brokerMeter, brokerController, 
BrokerMetricsManager::newAttributesBuilder);
+            PopMetricsManager.initMetrics(brokerMeter, brokerController, 
this::newAttributesBuilder);
         }
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 928bd397e1..2277881209 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -182,12 +182,12 @@ public abstract class AbstractSendMessageProcessor 
implements NettyRequestProces
         if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
             || delayLevel < 0) {
 
-            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+            Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                 .put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
                 .put(LABEL_TOPIC, requestHeader.getOriginTopic())
                 .put(LABEL_IS_SYSTEM, 
BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), 
requestHeader.getGroup()))
                 .build();
-            BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+            
this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1, 
attributes);
 
             isDLQ = true;
             newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4eb78fc1c2..298e239086 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -66,7 +66,7 @@ import 
org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
 import org.apache.rocketmq.broker.metrics.InvocationStatus;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -614,11 +614,11 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             executionTime = System.currentTimeMillis() - startTime;
             InvocationStatus status = response.getCode() == 
ResponseCode.SUCCESS ?
                 InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
-            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+            Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                 .put(LABEL_INVOCATION_STATUS, status.getName())
                 .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
                 .build();
-            BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, 
attributes);
+            
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime,
 attributes);
         }
         LOGGER.info("executionTime of create topic:{} is {} ms", topic, 
executionTime);
         return response;
@@ -695,11 +695,11 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             executionTime = System.currentTimeMillis() - startTime;
             InvocationStatus status = response.getCode() == 
ResponseCode.SUCCESS ?
                 InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
-            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+            Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                 .put(LABEL_INVOCATION_STATUS, status.getName())
                 .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames))
                 .build();
-            BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, 
attributes);
+            
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime,
 attributes);
         }
         LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, 
executionTime);
         return response;
@@ -1551,10 +1551,10 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         }
         InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
             InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
-        Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+        Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
             .put(LABEL_INVOCATION_STATUS, status.getName())
             .build();
-        
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, 
attributes);
+        
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime,
 attributes);
         return response;
     }
 
@@ -1589,10 +1589,10 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             LOGGER.info("executionTime of create 
updateAndCreateSubscriptionGroupList: {} is {} ms", groupNames, executionTime);
             InvocationStatus status = response.getCode() == 
ResponseCode.SUCCESS ?
                 InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
-            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+            Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                 .put(LABEL_INVOCATION_STATUS, status.getName())
                 .build();
-            
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, 
attributes);
+            
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime,
 attributes);
         }
 
         return response;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 43b66b4c51..3725732537 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -122,13 +122,13 @@ public class DefaultPullMessageResultHandler implements 
PullMessageResultHandler
                 
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(),
 getMessageResult.getMessageCount());
 
                 if 
(!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
-                    Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                    Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                         .put(LABEL_TOPIC, requestHeader.getTopic())
                         .put(LABEL_CONSUMER_GROUP, 
requestHeader.getConsumerGroup())
                         .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(requestHeader.getTopic()) || 
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
                         .build();
-                    
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), 
attributes);
-                    
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
 attributes);
+                    
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
 attributes);
+                    
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
 attributes);
                 }
 
                 if (!channelIsWritable(channel, requestHeader)) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index 468a8791d4..153ac24c1f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
 import org.apache.rocketmq.broker.transaction.OperationResult;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.TopicFilterType;
@@ -149,12 +149,12 @@ public class EndTransactionProcessor implements 
NettyRequestProcessor {
                         
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                         // successful committed, then total num of 
half-messages minus 1
                         
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(),
 -1);
-                        BrokerMetricsManager.commitMessagesTotal.add(1, 
BrokerMetricsManager.newAttributesBuilder()
+                        
this.brokerController.getBrokerMetricsManager().getCommitMessagesTotal().add(1, 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                                 .put(LABEL_TOPIC, msgInner.getTopic())
                                 .build());
                         // record the commit latency.
                         Long commitLatency = (System.currentTimeMillis() - 
result.getPrepareMessage().getBornTimestamp()) / 1000;
-                        
BrokerMetricsManager.transactionFinishLatency.record(commitLatency, 
BrokerMetricsManager.newAttributesBuilder()
+                        
this.brokerController.getBrokerMetricsManager().getTransactionFinishLatency().record(commitLatency,
 this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                                 .put(LABEL_TOPIC, msgInner.getTopic())
                                 .build());
                     }
@@ -176,7 +176,7 @@ public class EndTransactionProcessor implements 
NettyRequestProcessor {
                     
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                     // roll back, then total num of half-messages minus 1
                     
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC),
 -1);
-                    BrokerMetricsManager.rollBackMessagesTotal.add(1, 
BrokerMetricsManager.newAttributesBuilder()
+                    
this.brokerController.getBrokerMetricsManager().getRollBackMessagesTotal().add(1,
 this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                             .put(LABEL_TOPIC, 
result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
                             .build());
                 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 40117b74a5..584d248ab2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -255,13 +255,13 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
         }
         if (getMessageTmpResult != null) {
             if (!getMessageTmpResult.getMessageMapedList().isEmpty() && 
!isRetry) {
-                Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                     .put(LABEL_TOPIC, requestHeader.getTopic())
                     .put(LABEL_CONSUMER_GROUP, 
requestHeader.getConsumerGroup())
                     .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(requestHeader.getTopic()) || 
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
                     .build();
-                
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), 
attributes);
-                
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
 attributes);
+                
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
 attributes);
+                
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
 attributes);
             }
 
             for (SelectMappedBufferResult mappedBuffer : 
getMessageTmpResult.getMessageMapedList()) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 7d98705576..83ca35091e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -45,7 +45,7 @@ import org.apache.rocketmq.broker.longpolling.PollingHeader;
 import org.apache.rocketmq.broker.longpolling.PollingResult;
 import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
 import org.apache.rocketmq.broker.longpolling.PopRequest;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
 import org.apache.rocketmq.broker.pop.PopConsumerContext;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -788,14 +788,14 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                     
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(),
 topic,
                         result.getBufferTotalSize());
 
-                    Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                    Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                         .put(LABEL_TOPIC, requestHeader.getTopic())
                         .put(LABEL_CONSUMER_GROUP, 
requestHeader.getConsumerGroup())
                         .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(requestHeader.getTopic()) || 
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
                         .put(LABEL_IS_RETRY, isRetry)
                         .build();
-                    
BrokerMetricsManager.messagesOutTotal.add(result.getMessageCount(), attributes);
-                    
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), 
attributes);
+                    
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(result.getMessageCount(),
 attributes);
+                    
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(result.getBufferTotalSize(),
 attributes);
 
                     if (isOrder) {
                         
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(),
 isRetry, topic,
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 2be41a69d6..ea7f177f39 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -31,7 +31,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -223,13 +223,13 @@ public class PopReviveService extends ServiceThread {
                     
brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, 
queueId,
                         brokerController.getMessageStore().now() - 
foundList.get(foundList.size() - 1).getStoreTimestamp());
 
-                    Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                    Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                         .put(LABEL_TOPIC, topic)
                         .put(LABEL_CONSUMER_GROUP, group)
                         .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
                         .build();
-                    
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), 
attributes);
-                    
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
 attributes);
+                    
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
 attributes);
+                    
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
 attributes);
 
                     break;
                 case NO_MATCHED_MESSAGE:
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index a70b48debe..9b2bbc34e8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -298,14 +298,14 @@ public class ReplyMessageProcessor extends 
AbstractSendMessageProcessor {
             
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), 
putMessageResult.getAppendMessageResult().getMsgNum());
 
             if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
-                Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                     .put(LABEL_TOPIC, msg.getTopic())
                     .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
                     .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(msg.getTopic()))
                     .build();
-                
BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(),
 attributes);
-                
BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(),
 attributes);
-                
BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes()
 / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+                
this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(putMessageResult.getAppendMessageResult().getMsgNum(),
 attributes);
+                
this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(putMessageResult.getAppendMessageResult().getWroteBytes(),
 attributes);
+                
this.brokerController.getBrokerMetricsManager().getMessageSize().record(putMessageResult.getAppendMessageResult().getWroteBytes()
 / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
             }
 
             
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 669cd5e677..6d60290a58 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -207,12 +207,12 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             }
 
             if (reconsumeTimes > maxReconsumeTimes || 
sendRetryMessageToDeadLetterQueueDirectly) {
-                Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                     .put(LABEL_CONSUMER_GROUP, 
requestHeader.getProducerGroup())
                     .put(LABEL_TOPIC, requestHeader.getTopic())
                     .put(LABEL_IS_SYSTEM, 
BrokerMetricsManager.isSystem(requestHeader.getTopic(), 
requestHeader.getProducerGroup()))
                     .build();
-                BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+                
this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1, 
attributes);
 
                 properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
                 newTopic = MixAll.getDLQTopic(groupName);
@@ -468,14 +468,14 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 (int) (this.brokerController.getMessageStore().now() - 
beginTimeMillis));
 
             if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
-                Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                     .put(LABEL_TOPIC, msg.getTopic())
                     .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
                     .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(msg.getTopic()))
                     .build();
-                
BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(),
 attributes);
-                
BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(),
 attributes);
-                
BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes()
 / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+                
this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(putMessageResult.getAppendMessageResult().getMsgNum(),
 attributes);
+                
this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(putMessageResult.getAppendMessageResult().getWroteBytes(),
 attributes);
+                
this.brokerController.getBrokerMetricsManager().getMessageSize().record(putMessageResult.getAppendMessageResult().getWroteBytes()
 / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
             }
 
             response.setRemark(null);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 25c24aff98..bec75fe2fb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -722,26 +722,26 @@ public class ScheduleMessageService extends ConfigManager 
{
                 
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, 
result.getAppendMessageResult().getMsgNum());
                 
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
 TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, 
result.getAppendMessageResult().getWroteBytes());
 
-                Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                Attributes attributes = 
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                     .put(LABEL_TOPIC, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)
                     .put(LABEL_CONSUMER_GROUP, MixAll.SCHEDULE_CONSUMER_GROUP)
                     .put(LABEL_IS_SYSTEM, true)
                     .build();
-                
BrokerMetricsManager.messagesOutTotal.add(result.getAppendMessageResult().getMsgNum(),
 attributes);
-                
BrokerMetricsManager.throughputOutTotal.add(result.getAppendMessageResult().getWroteBytes(),
 attributes);
+                
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(result.getAppendMessageResult().getMsgNum(),
 attributes);
+                
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(result.getAppendMessageResult().getWroteBytes(),
 attributes);
 
                 
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic,
 result.getAppendMessageResult().getMsgNum(), 1);
                 
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic,
 result.getAppendMessageResult().getWroteBytes());
                 
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic,
 result.getAppendMessageResult().getMsgNum());
 
-                attributes = BrokerMetricsManager.newAttributesBuilder()
+                attributes = 
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                     .put(LABEL_TOPIC, topic)
                     .put(LABEL_MESSAGE_TYPE, 
TopicMessageType.DELAY.getMetricsValue())
                     .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
                     .build();
-                
BrokerMetricsManager.messagesInTotal.add(result.getAppendMessageResult().getMsgNum(),
 attributes);
-                
BrokerMetricsManager.throughputInTotal.add(result.getAppendMessageResult().getWroteBytes(),
 attributes);
-                
BrokerMetricsManager.messageSize.record(result.getAppendMessageResult().getWroteBytes()
 / result.getAppendMessageResult().getMsgNum(), attributes);
+                
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(result.getAppendMessageResult().getMsgNum(),
 attributes);
+                
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(result.getAppendMessageResult().getWroteBytes(),
 attributes);
+                
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessageSize().record(result.getAppendMessageResult().getWroteBytes()
 / result.getAppendMessageResult().getMsgNum(), attributes);
             }
         }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
index 2383f4f917..5fcc1f56b7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
@@ -27,7 +27,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.MixAll;
@@ -146,13 +146,13 @@ public class TransactionalMessageBridge {
                         this.brokerController.getMessageStore().now() - 
foundList.get(foundList.size() - 1)
                             .getStoreTimestamp());
 
-                    Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                    Attributes attributes = 
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
                         .put(LABEL_TOPIC, topic)
                         .put(LABEL_CONSUMER_GROUP, group)
                         .put(LABEL_IS_SYSTEM, 
TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
                         .build();
-                    
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), 
attributes);
-                    
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
 attributes);
+                    
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
 attributes);
+                    
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
 attributes);
 
                     break;
                 case NO_MATCHED_MESSAGE:
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 789143234f..24a26b2350 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -75,6 +75,15 @@ public class BrokerControllerTest {
         brokerController.shutdown();
     }
 
+    @Test
+    public void testBrokerMetricsManagerInitialization() throws Exception {
+        BrokerController brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, new NettyClientConfig(), messageStoreConfig);
+        assertThat(brokerController.initialize()).isTrue();
+        // Verify that brokerMetricsManager is properly initialized and not 
null
+        assertThat(brokerController.getBrokerMetricsManager()).isNotNull();
+        brokerController.shutdown();
+    }
+
     @After
     public void destroy() {
         UtilAll.deleteFile(new File(messageStoreConfig.getStorePathRootDir()));
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
index 9264eb4b56..9e4cfa70c1 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
@@ -43,16 +43,36 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 public class BrokerMetricsManagerTest {
 
+    private BrokerMetricsManager createTestBrokerMetricsManager() {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        String storePathRootDir = System.getProperty("java.io.tmpdir") + 
File.separator + "store-"
+                + UUID.randomUUID();
+        messageStoreConfig.setStorePathRootDir(storePathRootDir);
+        BrokerConfig brokerConfig = new BrokerConfig();
+
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(0);
+
+        BrokerController brokerController = new BrokerController(brokerConfig, 
nettyServerConfig,
+                new NettyClientConfig(), messageStoreConfig);
+
+        return new BrokerMetricsManager(brokerController);
+    }
+
     @Test
     public void testNewAttributesBuilder() {
-        Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder().put("a", "b")
+        BrokerMetricsManager metricsManager = createTestBrokerMetricsManager();
+        Attributes attributes = metricsManager.newAttributesBuilder().put("a", 
"b")
                 .build();
         assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b");
     }
 
     @Test
     public void testCustomizedAttributesBuilder() {
-        BrokerMetricsManager.attributesBuilderSupplier = () -> new 
AttributesBuilder() {
+        BrokerMetricsManager metricsManager = createTestBrokerMetricsManager();
+        
+        // Create a custom attributes builder supplier for testing
+        metricsManager.setAttributesBuilderSupplier(() -> new 
AttributesBuilder() {
             private AttributesBuilder attributesBuilder = Attributes.builder();
 
             @Override
@@ -77,8 +97,9 @@ public class BrokerMetricsManagerTest {
                 attributesBuilder.putAll(attributes);
                 return this;
             }
-        };
-        Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder().put("a", "b")
+        });
+        
+        Attributes attributes = metricsManager.newAttributesBuilder().put("a", 
"b")
                 .build();
         assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b");
         
assertThat(attributes.get(AttributeKey.stringKey("customized"))).isEqualTo("value");
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index f3d0eb0782..1bf99eadfb 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
 import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
@@ -238,6 +239,8 @@ public class AdminBrokerProcessorTest {
         brokerController.setMessageStore(messageStore);
         
brokerController.setAuthenticationMetadataManager(authenticationMetadataManager);
         
brokerController.setAuthorizationMetadataManager(authorizationMetadataManager);
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        brokerController.setBrokerMetricsManager(new 
BrokerMetricsManager(brokerController));
         Field field = BrokerController.class.getDeclaredField("broker2Client");
         field.setAccessible(true);
         field.set(brokerController, broker2Client);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index e4360f147b..1751ad96fd 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.transaction.OperationResult;
 import org.apache.rocketmq.broker.transaction.TransactionMetrics;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -83,6 +84,8 @@ public class EndTransactionProcessorTest {
         
when(transactionMsgService.getTransactionMetrics()).thenReturn(transactionMetrics);
         brokerController.setMessageStore(messageStore);
         brokerController.setTransactionalMessageService(transactionMsgService);
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        brokerController.setBrokerMetricsManager(new 
BrokerMetricsManager(brokerController));
         endTransactionProcessor = new 
EndTransactionProcessor(brokerController);
     }
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
index 9baf2a6ebb..5722b031b0 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
@@ -86,6 +87,8 @@ public class PeekMessageProcessorTest {
 
     @Before
     public void init() {
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        brokerController.setBrokerMetricsManager(new 
BrokerMetricsManager(brokerController));
         peekMessageProcessor = new PeekMessageProcessor(brokerController);
         when(brokerController.getMessageStore()).thenReturn(messageStore);
         topicConfigManager = new TopicConfigManager(brokerController);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index fdb0690e5d..59559d3cfd 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.TopicConfig;
@@ -78,6 +79,8 @@ public class PopMessageProcessorTest {
     public void init() {
         brokerController.setMessageStore(messageStore);
         brokerController.getBrokerConfig().setEnablePopBufferMerge(true);
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        brokerController.setBrokerMetricsManager(new 
BrokerMetricsManager(brokerController));
         popMessageProcessor = new PopMessageProcessor(brokerController);
         when(handlerContext.channel()).thenReturn(embeddedChannel);
         
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig(topic));
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index 83c3011185..cecd1ff86a 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -82,6 +83,8 @@ public class PullMessageProcessorTest {
     @Before
     public void init() {
         brokerController.setMessageStore(messageStore);
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        brokerController.setBrokerMetricsManager(new 
BrokerMetricsManager(brokerController));
         SubscriptionGroupManager subscriptionGroupManager = new 
SubscriptionGroupManager(brokerController);
         pullMessageProcessor = new PullMessageProcessor(brokerController);
         
when(brokerController.getPullMessageProcessor()).thenReturn(pullMessageProcessor);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
index 266c8491cb..03af9b948b 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -81,6 +82,8 @@ public class ReplyMessageProcessorTest {
     public void init() throws IllegalAccessException, NoSuchFieldException {
         clientInfo = new ClientChannelInfo(channel, "127.0.0.1", 
LanguageCode.JAVA, 0);
         brokerController.setMessageStore(messageStore);
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        brokerController.setBrokerMetricsManager(new 
BrokerMetricsManager(brokerController));
         Field field = BrokerController.class.getDeclaredField("broker2Client");
         field.setAccessible(true);
         field.set(brokerController, broker2Client);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 9da6a96ec9..ce8b3405f6 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import org.apache.commons.codec.DecoderException;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -102,6 +103,8 @@ public class SendMessageProcessorTest {
     @Before
     public void init() {
         brokerController.setMessageStore(messageStore);
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        brokerController.setBrokerMetricsManager(new 
BrokerMetricsManager(brokerController));
         TopicConfigManager topicConfigManager = new 
TopicConfigManager(brokerController);
         topicConfigManager.getTopicConfigTable().put(topic, new 
TopicConfig(topic));
         SubscriptionGroupManager subscriptionGroupManager = new 
SubscriptionGroupManager(brokerController);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
index b90fb2931d..675c9a57c8 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.util.HookUtils;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -48,6 +49,9 @@ import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import io.opentelemetry.api.common.Attributes;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -132,10 +136,22 @@ public class ScheduleMessageServiceTest {
         
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(manager);
         EscapeBridge escapeBridge = new EscapeBridge(brokerController);
         
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
-        scheduleMessageService = new ScheduleMessageService(brokerController);
+        // Initialize BrokerMetricsManager to prevent NPE in tests
+        BrokerMetricsManager brokerMetricsManager = 
Mockito.mock(BrokerMetricsManager.class);
+        // Mock newAttributesBuilder to return a valid AttributesBuilder 
instead of null
+        
Mockito.when(brokerMetricsManager.newAttributesBuilder()).thenReturn(Attributes.builder());
+        // Mock metrics getter methods to return Nop implementations to 
prevent NPE
+        Mockito.when(brokerMetricsManager.getMessagesInTotal()).thenReturn(new 
NopLongCounter());
+        
Mockito.when(brokerMetricsManager.getMessagesOutTotal()).thenReturn(new 
NopLongCounter());
+        
Mockito.when(brokerMetricsManager.getThroughputInTotal()).thenReturn(new 
NopLongCounter());
+        
Mockito.when(brokerMetricsManager.getThroughputOutTotal()).thenReturn(new 
NopLongCounter());
+        Mockito.when(brokerMetricsManager.getMessageSize()).thenReturn(new 
NopLongHistogram());
+        
Mockito.when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
+        scheduleMessageService = Mockito.spy(new 
ScheduleMessageService(brokerController));
+        // Mock ScheduleMessageService before it's used in HookUtils
+        
Mockito.when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
         scheduleMessageService.load();
         scheduleMessageService.start();
-        
Mockito.when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
     }
 
     @Test
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
index 2b8dac5d8b..81db576e3d 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
@@ -81,7 +81,7 @@ public class ProxyMetricsManager implements StartAndShutdown {
         LABEL_MAP.put(LABEL_CLUSTER_NAME, proxyConfig.getProxyClusterName());
         LABEL_MAP.put(LABEL_NODE_ID, proxyConfig.getProxyName());
         LABEL_MAP.put(LABEL_PROXY_MODE, 
proxyConfig.getProxyMode().toLowerCase());
-        initMetrics(brokerMetricsManager.getBrokerMeter(), 
BrokerMetricsManager::newAttributesBuilder);
+        initMetrics(brokerMetricsManager.getBrokerMeter(), 
brokerMetricsManager::newAttributesBuilder);
     }
 
     public static ProxyMetricsManager initClusterMode(ProxyConfig proxyConfig) 
{
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 2bdd058f3f..4d13acf225 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -160,6 +160,7 @@ public class DefaultMessageStore implements MessageStore {
 
     protected StoreCheckpoint storeCheckpoint;
     private TimerMessageStore timerMessageStore;
+    private final DefaultStoreMetricsManager defaultStoreMetricsManager;
 
     private final LinkedList<CommitLogDispatcher> dispatcherList = new 
LinkedList<>();
 
@@ -237,6 +238,8 @@ public class DefaultMessageStore implements MessageStore {
 
         this.transientStorePool = new 
TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), 
messageStoreConfig.getMappedFileSizeCommitLog());
 
+        this.defaultStoreMetricsManager = new DefaultStoreMetricsManager();
+
         this.scheduledExecutorService =
             ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
 
@@ -2970,12 +2973,12 @@ public class DefaultMessageStore implements 
MessageStore {
 
     @Override
     public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
-        return DefaultStoreMetricsManager.getMetricsView();
+        return this.defaultStoreMetricsManager.getMetricsView();
     }
 
     @Override
     public void initMetrics(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier) {
-        DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, 
this);
+        this.defaultStoreMetricsManager.init(meter, attributesBuilderSupplier, 
this);
     }
 
     /**
@@ -3021,4 +3024,8 @@ public class DefaultMessageStore implements MessageStore {
         this.notifyMessageArriveInBatch = notifyMessageArriveInBatch;
     }
 
+    public DefaultStoreMetricsManager getDefaultStoreMetricsManager() {
+        return defaultStoreMetricsManager;
+    }
+
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index ef72de8baa..8d3963bb4a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -64,26 +64,29 @@ import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABE
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
 
 public class DefaultStoreMetricsManager {
-    public static Supplier<AttributesBuilder> attributesBuilderSupplier;
-    public static MessageStoreConfig messageStoreConfig;
-
-    public static ObservableLongGauge storageSize = new 
NopObservableLongGauge();
-    public static ObservableLongGauge flushBehind = new 
NopObservableLongGauge();
-    public static ObservableLongGauge dispatchBehind = new 
NopObservableLongGauge();
-    public static ObservableLongGauge messageReserveTime = new 
NopObservableLongGauge();
-
-    public static ObservableLongGauge timerEnqueueLag = new 
NopObservableLongGauge();
-    public static ObservableLongGauge timerEnqueueLatency = new 
NopObservableLongGauge();
-    public static ObservableLongGauge timerDequeueLag = new 
NopObservableLongGauge();
-    public static ObservableLongGauge timerDequeueLatency = new 
NopObservableLongGauge();
-    public static ObservableLongGauge timingMessages = new 
NopObservableLongGauge();
-
-    public static LongCounter timerDequeueTotal = new NopLongCounter();
-    public static LongCounter timerEnqueueTotal = new NopLongCounter();
-    public static ObservableLongGauge timerMessageSnapshot = new 
NopObservableLongGauge();
-    public static LongHistogram timerMessageSetLatency = new 
NopLongHistogram();
-
-    public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() 
{
+    private Supplier<AttributesBuilder> attributesBuilderSupplier;
+    private MessageStoreConfig messageStoreConfig;
+
+    private ObservableLongGauge storageSize = new NopObservableLongGauge();
+    private ObservableLongGauge flushBehind = new NopObservableLongGauge();
+    private ObservableLongGauge dispatchBehind = new NopObservableLongGauge();
+    private ObservableLongGauge messageReserveTime = new 
NopObservableLongGauge();
+
+    private ObservableLongGauge timerEnqueueLag = new NopObservableLongGauge();
+    private ObservableLongGauge timerEnqueueLatency = new 
NopObservableLongGauge();
+    private ObservableLongGauge timerDequeueLag = new NopObservableLongGauge();
+    private ObservableLongGauge timerDequeueLatency = new 
NopObservableLongGauge();
+    private ObservableLongGauge timingMessages = new NopObservableLongGauge();
+
+    private LongCounter timerDequeueTotal = new NopLongCounter();
+    private LongCounter timerEnqueueTotal = new NopLongCounter();
+    private ObservableLongGauge timerMessageSnapshot = new 
NopObservableLongGauge();
+    private LongHistogram timerMessageSetLatency = new NopLongHistogram();
+
+    public DefaultStoreMetricsManager() {
+    }
+
+    public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
         List<Double> rpcCostTimeBuckets = Arrays.asList(
                 // day * hour * min * second
                 1d * 1 * 1 * 60, // 60 second
@@ -102,42 +105,42 @@ public class DefaultStoreMetricsManager {
         return Lists.newArrayList(new Pair<>(selector, viewBuilder));
     }
 
-    public static void init(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier,
+    public void init(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier,
         DefaultMessageStore messageStore) {
 
         // Also add some metrics for rocksdb's monitoring.
         RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, 
messageStore.getQueueStore());
 
-        DefaultStoreMetricsManager.attributesBuilderSupplier = 
attributesBuilderSupplier;
-        DefaultStoreMetricsManager.messageStoreConfig = 
messageStore.getMessageStoreConfig();
+        this.attributesBuilderSupplier = attributesBuilderSupplier;
+        this.messageStoreConfig = messageStore.getMessageStoreConfig();
 
-        storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
+        this.storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
             .setDescription("Broker storage size")
             .setUnit("bytes")
             .ofLongs()
             .buildWithCallback(measurement -> {
-                File storeDir = new 
File(messageStoreConfig.getStorePathRootDir());
+                File storeDir = new 
File(this.messageStoreConfig.getStorePathRootDir());
                 if (storeDir.exists() && storeDir.isDirectory()) {
                     long totalSpace = storeDir.getTotalSpace();
                     if (totalSpace > 0) {
-                        measurement.record(totalSpace - 
storeDir.getFreeSpace(), newAttributesBuilder().build());
+                        measurement.record(totalSpace - 
storeDir.getFreeSpace(), this.newAttributesBuilder().build());
                     }
                 }
             });
 
-        flushBehind = meter.gaugeBuilder(GAUGE_STORAGE_FLUSH_BEHIND)
+        this.flushBehind = meter.gaugeBuilder(GAUGE_STORAGE_FLUSH_BEHIND)
             .setDescription("Broker flush behind bytes")
             .setUnit("bytes")
             .ofLongs()
-            .buildWithCallback(measurement -> 
measurement.record(messageStore.flushBehindBytes(), 
newAttributesBuilder().build()));
+            .buildWithCallback(measurement -> 
measurement.record(messageStore.flushBehindBytes(), 
this.newAttributesBuilder().build()));
 
-        dispatchBehind = meter.gaugeBuilder(GAUGE_STORAGE_DISPATCH_BEHIND)
+        this.dispatchBehind = meter.gaugeBuilder(GAUGE_STORAGE_DISPATCH_BEHIND)
             .setDescription("Broker dispatch behind bytes")
             .setUnit("bytes")
             .ofLongs()
-            .buildWithCallback(measurement -> 
measurement.record(messageStore.dispatchBehindBytes(), 
newAttributesBuilder().build()));
+            .buildWithCallback(measurement -> 
measurement.record(messageStore.dispatchBehindBytes(), 
this.newAttributesBuilder().build()));
 
-        messageReserveTime = 
meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
+        this.messageReserveTime = 
meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
             .setDescription("Broker message reserve time")
             .setUnit("milliseconds")
             .ofLongs()
@@ -146,42 +149,42 @@ public class DefaultStoreMetricsManager {
                 if (earliestMessageTime <= 0) {
                     return;
                 }
-                measurement.record(System.currentTimeMillis() - 
earliestMessageTime, newAttributesBuilder().build());
+                measurement.record(System.currentTimeMillis() - 
earliestMessageTime, this.newAttributesBuilder().build());
             });
 
         if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
-            timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+            this.timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
                 .setDescription("Timer enqueue messages lag")
                 .ofLongs()
                 .buildWithCallback(measurement -> {
                     TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                    
measurement.record(timerMessageStore.getEnqueueBehindMessages(), 
newAttributesBuilder().build());
+                    
measurement.record(timerMessageStore.getEnqueueBehindMessages(), 
this.newAttributesBuilder().build());
                 });
 
-            timerEnqueueLatency = 
meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+            this.timerEnqueueLatency = 
meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
                 .setDescription("Timer enqueue latency")
                 .setUnit("milliseconds")
                 .ofLongs()
                 .buildWithCallback(measurement -> {
                     TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                    
measurement.record(timerMessageStore.getEnqueueBehindMillis(), 
newAttributesBuilder().build());
+                    
measurement.record(timerMessageStore.getEnqueueBehindMillis(), 
this.newAttributesBuilder().build());
                 });
-            timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+            this.timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
                 .setDescription("Timer dequeue messages lag")
                 .ofLongs()
                 .buildWithCallback(measurement -> {
                     TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                    
measurement.record(timerMessageStore.getDequeueBehindMessages(), 
newAttributesBuilder().build());
+                    
measurement.record(timerMessageStore.getDequeueBehindMessages(), 
this.newAttributesBuilder().build());
                 });
-            timerDequeueLatency = 
meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+            this.timerDequeueLatency = 
meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
                 .setDescription("Timer dequeue latency")
                 .setUnit("milliseconds")
                 .ofLongs()
                 .buildWithCallback(measurement -> {
                     TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                    measurement.record(timerMessageStore.getDequeueBehind(), 
newAttributesBuilder().build());
+                    measurement.record(timerMessageStore.getDequeueBehind(), 
this.newAttributesBuilder().build());
                 });
-            timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+            this.timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
                 .setDescription("Current message number in timing")
                 .ofLongs()
                 .buildWithCallback(measurement -> {
@@ -191,23 +194,23 @@ public class DefaultStoreMetricsManager {
                         .forEach((topic, metric) -> {
                             measurement.record(
                                 metric.getCount().get(),
-                                newAttributesBuilder().put(LABEL_TOPIC, 
topic).build()
+                                this.newAttributesBuilder().put(LABEL_TOPIC, 
topic).build()
                             );
                         });
                 });
-            timerDequeueTotal = 
meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+            this.timerDequeueTotal = 
meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
                 .setDescription("Total number of timer dequeue")
                 .build();
-            timerEnqueueTotal = 
meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+            this.timerEnqueueTotal = 
meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
                 .setDescription("Total number of timer enqueue")
                 .build();
-            timerMessageSnapshot = 
meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT)
+            this.timerMessageSnapshot = 
meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT)
                 .setDescription("Timer message distribution snapshot, only 
count timing messages in 24h.")
                 .ofLongs()
                 .buildWithCallback(measurement -> {
                     TimerMetrics timerMetrics = 
messageStore.getTimerMessageStore().getTimerMetrics();
                     TimerWheel timerWheel = 
messageStore.getTimerMessageStore().getTimerWheel();
-                    int precisionMs = messageStoreConfig.getTimerPrecisionMs();
+                    int precisionMs = 
this.messageStoreConfig.getTimerPrecisionMs();
                     List<Integer> timerDist = timerMetrics.getTimerDistList();
                     long currTime = System.currentTimeMillis() / precisionMs * 
precisionMs;
                     for (int i = 0; i < timerDist.size(); i++) {
@@ -218,10 +221,10 @@ public class DefaultStoreMetricsManager {
                             Slot slotEach = timerWheel.getSlot(currTime + 
(long) j * precisionMs);
                             periodTotal += slotEach.num;
                         }
-                        measurement.record(periodTotal, 
newAttributesBuilder().put(LABEL_TIMING_BOUND, 
timerDist.get(i).toString()).build());
+                        measurement.record(periodTotal, 
this.newAttributesBuilder().put(LABEL_TIMING_BOUND, 
timerDist.get(i).toString()).build());
                     }
                 });
-            timerMessageSetLatency = 
meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY)
+            this.timerMessageSetLatency = 
meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY)
                     .setDescription("Timer message set latency distribution")
                     .setUnit("seconds")
                     .ofLongs()
@@ -229,26 +232,96 @@ public class DefaultStoreMetricsManager {
         }
     }
 
-    public static void incTimerDequeueCount(String topic) {
-        timerDequeueTotal.add(1, newAttributesBuilder()
+    public void incTimerDequeueCount(String topic) {
+        this.timerDequeueTotal.add(1, this.newAttributesBuilder()
             .put(LABEL_TOPIC, topic)
             .build());
     }
 
-    public static void incTimerEnqueueCount(String topic) {
-        AttributesBuilder attributesBuilder = newAttributesBuilder();
+    public void incTimerEnqueueCount(String topic) {
+        AttributesBuilder attributesBuilder = this.newAttributesBuilder();
         if (topic != null) {
             attributesBuilder.put(LABEL_TOPIC, topic);
         }
-        timerEnqueueTotal.add(1, attributesBuilder.build());
+        this.timerEnqueueTotal.add(1, attributesBuilder.build());
     }
 
-    public static AttributesBuilder newAttributesBuilder() {
-        if (attributesBuilderSupplier == null) {
+    public AttributesBuilder newAttributesBuilder() {
+        if (this.attributesBuilderSupplier == null) {
             return Attributes.builder();
         }
-        return attributesBuilderSupplier.get()
+        return this.attributesBuilderSupplier.get()
             .put(LABEL_STORAGE_TYPE, DEFAULT_STORAGE_TYPE)
             .put(LABEL_STORAGE_MEDIUM, DEFAULT_STORAGE_MEDIUM);
     }
+
+    // Getter methods for external access
+    public Supplier<AttributesBuilder> getAttributesBuilderSupplier() {
+        return attributesBuilderSupplier;
+    }
+
+    public MessageStoreConfig getMessageStoreConfig() {
+        return messageStoreConfig;
+    }
+
+    public ObservableLongGauge getStorageSize() {
+        return storageSize;
+    }
+
+    public ObservableLongGauge getFlushBehind() {
+        return flushBehind;
+    }
+
+    public ObservableLongGauge getDispatchBehind() {
+        return dispatchBehind;
+    }
+
+    public ObservableLongGauge getMessageReserveTime() {
+        return messageReserveTime;
+    }
+
+    public ObservableLongGauge getTimerEnqueueLag() {
+        return timerEnqueueLag;
+    }
+
+    public ObservableLongGauge getTimerEnqueueLatency() {
+        return timerEnqueueLatency;
+    }
+
+    public ObservableLongGauge getTimerDequeueLag() {
+        return timerDequeueLag;
+    }
+
+    public ObservableLongGauge getTimerDequeueLatency() {
+        return timerDequeueLatency;
+    }
+
+    public ObservableLongGauge getTimingMessages() {
+        return timingMessages;
+    }
+
+    public LongCounter getTimerDequeueTotal() {
+        return timerDequeueTotal;
+    }
+
+    public LongCounter getTimerEnqueueTotal() {
+        return timerEnqueueTotal;
+    }
+
+    public ObservableLongGauge getTimerMessageSnapshot() {
+        return timerMessageSnapshot;
+    }
+
+    public LongHistogram getTimerMessageSetLatency() {
+        return timerMessageSetLatency;
+    }
+
+    // Setter methods for testing
+    public void setAttributesBuilderSupplier(Supplier<AttributesBuilder> 
attributesBuilderSupplier) {
+        this.attributesBuilderSupplier = attributesBuilderSupplier;
+    }
+
+    public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
+        this.messageStoreConfig = messageStoreConfig;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index d6af7b84e7..1f51a063d6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -701,9 +701,13 @@ public class TimerMessageStore {
                                 return false;
                             }
                         }
-                        Attributes attributes = 
DefaultStoreMetricsManager.newAttributesBuilder()
+                        // Record timer message set latency
+                        if (messageStore instanceof DefaultMessageStore) {
+                            DefaultStoreMetricsManager metricsManager = 
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager();
+                            Attributes attributes = 
metricsManager.newAttributesBuilder()
                                 .put(DefaultStoreMetricsConstant.LABEL_TOPIC, 
msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
-                        
DefaultStoreMetricsManager.timerMessageSetLatency.record((delayedTime - 
msgExt.getBornTimestamp()) / 1000, attributes);
+                            
metricsManager.getTimerMessageSetLatency().record((delayedTime - 
msgExt.getBornTimestamp()) / 1000, attributes);
+                        }
                     }
                 } catch (Exception e) {
                     // here may cause the message loss
@@ -1366,7 +1370,9 @@ public class TimerMessageStore {
         protected void putMessageToTimerWheel(TimerRequest req) {
             try {
                 perfCounterTicks.startTick(ENQUEUE_PUT);
-                
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
+                if (messageStore instanceof DefaultMessageStore) {
+                    ((DefaultMessageStore) 
messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg()));
+                }
                 if (shouldRunningDequeue && req.getDelayTime() < 
currWriteTimeMs) {
                     req.setEnqueueTime(Long.MAX_VALUE);
                     dequeuePutQueue.put(req);
@@ -1502,7 +1508,9 @@ public class TimerMessageStore {
                                 perfCounterTicks.startTick(DEQUEUE_PUT);
 
                                 MessageExt msgExt = tr.getMsg();
-                                
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
+                                if (messageStore instanceof 
DefaultMessageStore) {
+                                    ((DefaultMessageStore) 
messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt));
+                                }
 
                                 if (tr.getEnqueueTime() == Long.MAX_VALUE) {
                                     // Never enqueue, mark it.

Reply via email to