This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7967edfc5e [ISSUE #9677] Resolve metrics static variable conflicts in
BrokerContainer mode (#9678)
7967edfc5e is described below
commit 7967edfc5e28041a6019d3b3ce353fcef0eb7d3f
Author: rongtong <[email protected]>
AuthorDate: Thu Sep 11 16:29:20 2025 +0800
[ISSUE #9677] Resolve metrics static variable conflicts in BrokerContainer
mode (#9678)
* Cherry-pick partial changes from 67db1757df - BrokerMetricsManager
refactoring
- Excluded DefaultMappedFile, METRICS_REFACTORING_GUIDE.md, TimerMetrics,
and RocksDB files per requirements
- Successfully applied changes to most broker and store modules
- Compilation errors in TimerMessageStore.java will be fixed in next commit
* fix: Resolve metrics static variable conflicts in BrokerContainer mode
Convert static metrics variables to instance-level to fix resource leaks and
data conflicts in BrokerContainer scenarios with multiple broker instances.
## Problem Statement
In BrokerContainer mode, multiple broker instances share static metrics
variables
from BrokerMetricsManager and DefaultStoreMetricsManager, causing:
- Metrics data conflicts between different broker instances
- Resource leaks during frequent addBroker/removeBroker operations
- Incorrect metrics aggregation across multiple brokers
## Solution
- Convert static metrics variables to instance-level variables
- Add proper getter methods for external access
- Ensure each broker instance maintains isolated metrics
- Apply instanceof checks for type safety in TimerMessageStore
## Files Modified
-
broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
- store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
- METRICS_REFACTORING_GUIDE.md (documentation)
## Key Benefits
✅ Eliminates metrics conflicts between broker instances
✅ Prevents resource leaks in dynamic broker scenarios
✅ Maintains proper metrics isolation per broker
✅ Supports BrokerContainer mode with multiple brokers
✅ Backward compatible with existing functionality
Resolves metrics static variable issues in multi-broker container
environments.
* Delete useless file
* Fix test can not pass
* Fix test can not pass
---
.../apache/rocketmq/broker/BrokerController.java | 4 +
.../broker/metrics/BrokerMetricsManager.java | 121 ++++++++++----
.../processor/AbstractSendMessageProcessor.java | 4 +-
.../broker/processor/AdminBrokerProcessor.java | 18 +-
.../processor/DefaultPullMessageResultHandler.java | 6 +-
.../broker/processor/EndTransactionProcessor.java | 8 +-
.../broker/processor/PeekMessageProcessor.java | 8 +-
.../broker/processor/PopMessageProcessor.java | 8 +-
.../broker/processor/PopReviveService.java | 8 +-
.../broker/processor/ReplyMessageProcessor.java | 8 +-
.../broker/processor/SendMessageProcessor.java | 12 +-
.../broker/schedule/ScheduleMessageService.java | 16 +-
.../queue/TransactionalMessageBridge.java | 8 +-
.../rocketmq/broker/BrokerControllerTest.java | 9 +
.../broker/metrics/BrokerMetricsManagerTest.java | 29 +++-
.../broker/processor/AdminBrokerProcessorTest.java | 3 +
.../processor/EndTransactionProcessorTest.java | 3 +
.../broker/processor/PeekMessageProcessorTest.java | 3 +
.../broker/processor/PopMessageProcessorTest.java | 3 +
.../broker/processor/PullMessageProcessorTest.java | 3 +
.../processor/ReplyMessageProcessorTest.java | 3 +
.../broker/processor/SendMessageProcessorTest.java | 3 +
.../schedule/ScheduleMessageServiceTest.java | 20 ++-
.../proxy/metrics/ProxyMetricsManager.java | 2 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 11 +-
.../store/metrics/DefaultStoreMetricsManager.java | 185 ++++++++++++++-------
.../rocketmq/store/timer/TimerMessageStore.java | 16 +-
27 files changed, 365 insertions(+), 157 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 33331bbabb..0cdec87d5e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -500,6 +500,10 @@ public class BrokerController {
return brokerMetricsManager;
}
+ public void setBrokerMetricsManager(BrokerMetricsManager
brokerMetricsManager) {
+ this.brokerMetricsManager = brokerMetricsManager;
+ }
+
protected void initializeRemotingServer() throws
CloneNotSupportedException {
RemotingServer tcpRemotingServer = new
NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig)
this.nettyServerConfig.clone();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 6300d763d6..5a32cf3c67 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -121,46 +121,45 @@ public class BrokerMetricsManager {
private final MessageStore messageStore;
private final BrokerController brokerController;
private final ConsumerLagCalculator consumerLagCalculator;
- private final static Map<String, String> LABEL_MAP = new HashMap<>();
+ private final Map<String, String> labelMap = new HashMap<>();
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
private MetricExporter loggingMetricExporter;
private Meter brokerMeter;
- public static Supplier<AttributesBuilder> attributesBuilderSupplier =
Attributes::builder;
+ private Supplier<AttributesBuilder> attributesBuilderSupplier =
Attributes::builder;
// broker stats metrics
- public static ObservableLongGauge processorWatermark = new
NopObservableLongGauge();
- public static ObservableLongGauge brokerPermission = new
NopObservableLongGauge();
- public static ObservableLongGauge topicNum = new NopObservableLongGauge();
- public static ObservableLongGauge consumerGroupNum = new
NopObservableLongGauge();
-
+ private ObservableLongGauge processorWatermark = new
NopObservableLongGauge();
+ private ObservableLongGauge brokerPermission = new
NopObservableLongGauge();
+ private ObservableLongGauge topicNum = new NopObservableLongGauge();
+ private ObservableLongGauge consumerGroupNum = new
NopObservableLongGauge();
// request metrics
- public static LongCounter messagesInTotal = new NopLongCounter();
- public static LongCounter messagesOutTotal = new NopLongCounter();
- public static LongCounter throughputInTotal = new NopLongCounter();
- public static LongCounter throughputOutTotal = new NopLongCounter();
- public static LongHistogram messageSize = new NopLongHistogram();
- public static LongHistogram topicCreateExecuteTime = new
NopLongHistogram();
- public static LongHistogram consumerGroupCreateExecuteTime = new
NopLongHistogram();
+ private LongCounter messagesInTotal = new NopLongCounter();
+ private LongCounter messagesOutTotal = new NopLongCounter();
+ private LongCounter throughputInTotal = new NopLongCounter();
+ private LongCounter throughputOutTotal = new NopLongCounter();
+ private LongHistogram messageSize = new NopLongHistogram();
+ private LongHistogram topicCreateExecuteTime = new NopLongHistogram();
+ private LongHistogram consumerGroupCreateExecuteTime = new
NopLongHistogram();
// client connection metrics
- public static ObservableLongGauge producerConnection = new
NopObservableLongGauge();
- public static ObservableLongGauge consumerConnection = new
NopObservableLongGauge();
+ private ObservableLongGauge producerConnection = new
NopObservableLongGauge();
+ private ObservableLongGauge consumerConnection = new
NopObservableLongGauge();
// Lag metrics
- public static ObservableLongGauge consumerLagMessages = new
NopObservableLongGauge();
- public static ObservableLongGauge consumerLagLatency = new
NopObservableLongGauge();
- public static ObservableLongGauge consumerInflightMessages = new
NopObservableLongGauge();
- public static ObservableLongGauge consumerQueueingLatency = new
NopObservableLongGauge();
- public static ObservableLongGauge consumerReadyMessages = new
NopObservableLongGauge();
- public static LongCounter sendToDlqMessages = new NopLongCounter();
- public static ObservableLongGauge halfMessages = new
NopObservableLongGauge();
- public static LongCounter commitMessagesTotal = new NopLongCounter();
- public static LongCounter rollBackMessagesTotal = new NopLongCounter();
- public static LongHistogram transactionFinishLatency = new
NopLongHistogram();
+ private ObservableLongGauge consumerLagMessages = new
NopObservableLongGauge();
+ private ObservableLongGauge consumerLagLatency = new
NopObservableLongGauge();
+ private ObservableLongGauge consumerInflightMessages = new
NopObservableLongGauge();
+ private ObservableLongGauge consumerQueueingLatency = new
NopObservableLongGauge();
+ private ObservableLongGauge consumerReadyMessages = new
NopObservableLongGauge();
+ private LongCounter sendToDlqMessages = new NopLongCounter();
+ private ObservableLongGauge halfMessages = new NopObservableLongGauge();
+ private LongCounter commitMessagesTotal = new NopLongCounter();
+ private LongCounter rollBackMessagesTotal = new NopLongCounter();
+ private LongHistogram transactionFinishLatency = new NopLongHistogram();
@SuppressWarnings("DoubleBraceInitialization")
public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new
ArrayList<String>() {
@@ -177,13 +176,13 @@ public class BrokerMetricsManager {
init();
}
- public static AttributesBuilder newAttributesBuilder() {
+ public AttributesBuilder newAttributesBuilder() {
AttributesBuilder attributesBuilder;
if (attributesBuilderSupplier == null) {
attributesBuilderSupplier = Attributes::builder;
}
attributesBuilder = attributesBuilderSupplier.get();
- LABEL_MAP.forEach(attributesBuilder::put);
+ labelMap.forEach(attributesBuilder::put);
return attributesBuilder;
}
@@ -242,6 +241,56 @@ public class BrokerMetricsManager {
return brokerMeter;
}
+ // Getter methods for metrics variables
+ public LongCounter getMessagesInTotal() {
+ return messagesInTotal;
+ }
+
+ public LongCounter getMessagesOutTotal() {
+ return messagesOutTotal;
+ }
+
+ public LongCounter getThroughputInTotal() {
+ return throughputInTotal;
+ }
+
+ public LongCounter getThroughputOutTotal() {
+ return throughputOutTotal;
+ }
+
+ public LongHistogram getMessageSize() {
+ return messageSize;
+ }
+
+ public LongCounter getSendToDlqMessages() {
+ return sendToDlqMessages;
+ }
+
+ public LongCounter getCommitMessagesTotal() {
+ return commitMessagesTotal;
+ }
+
+ public LongCounter getRollBackMessagesTotal() {
+ return rollBackMessagesTotal;
+ }
+
+ public LongHistogram getTransactionFinishLatency() {
+ return transactionFinishLatency;
+ }
+
+ public LongHistogram getTopicCreateExecuteTime() {
+ return topicCreateExecuteTime;
+ }
+
+ public LongHistogram getConsumerGroupCreateExecuteTime() {
+ return consumerGroupCreateExecuteTime;
+ }
+
+ // Setter method for testing purposes
+ public void setAttributesBuilderSupplier(Supplier<AttributesBuilder>
attributesBuilderSupplier) {
+ this.attributesBuilderSupplier = attributesBuilderSupplier;
+ }
+
private boolean checkConfig() {
if (brokerConfig == null) {
return false;
@@ -282,15 +331,15 @@ public class BrokerMetricsManager {
LOGGER.warn("metricsLabel is not valid: {}", labels);
continue;
}
- LABEL_MAP.put(split[0], split[1]);
+ labelMap.put(split[0], split[1]);
}
}
if (brokerConfig.isMetricsInDelta()) {
- LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
+ labelMap.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
}
- LABEL_MAP.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
- LABEL_MAP.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
- LABEL_MAP.put(LABEL_NODE_ID, brokerConfig.getBrokerName());
+ labelMap.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
+ labelMap.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
+ labelMap.put(LABEL_NODE_ID, brokerConfig.getBrokerName());
SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
.setResource(Resource.empty());
@@ -699,13 +748,13 @@ public class BrokerMetricsManager {
}
private void initOtherMetrics() {
if (brokerConfig.isEnableRemotingMetrics()) {
- RemotingMetricsManager.initMetrics(brokerMeter,
BrokerMetricsManager::newAttributesBuilder);
+ RemotingMetricsManager.initMetrics(brokerMeter,
this::newAttributesBuilder);
}
if (brokerConfig.isEnableMessageStoreMetrics()) {
- messageStore.initMetrics(brokerMeter,
BrokerMetricsManager::newAttributesBuilder);
+ messageStore.initMetrics(brokerMeter, this::newAttributesBuilder);
}
if (brokerConfig.isEnablePopMetrics()) {
- PopMetricsManager.initMetrics(brokerMeter, brokerController,
BrokerMetricsManager::newAttributesBuilder);
+ PopMetricsManager.initMetrics(brokerMeter, brokerController,
this::newAttributesBuilder);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 928bd397e1..2277881209 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -182,12 +182,12 @@ public abstract class AbstractSendMessageProcessor
implements NettyRequestProces
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
- Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
.put(LABEL_TOPIC, requestHeader.getOriginTopic())
.put(LABEL_IS_SYSTEM,
BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(),
requestHeader.getGroup()))
.build();
- BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+
this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1,
attributes);
isDLQ = true;
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4eb78fc1c2..298e239086 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -66,7 +66,7 @@ import
org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
import org.apache.rocketmq.broker.metrics.InvocationStatus;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -614,11 +614,11 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() ==
ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
- Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
- BrokerMetricsManager.topicCreateExecuteTime.record(executionTime,
attributes);
+
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime,
attributes);
}
LOGGER.info("executionTime of create topic:{} is {} ms", topic,
executionTime);
return response;
@@ -695,11 +695,11 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() ==
ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
- Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames))
.build();
- BrokerMetricsManager.topicCreateExecuteTime.record(executionTime,
attributes);
+
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime,
attributes);
}
LOGGER.info("executionTime of all topics:{} is {} ms", topicNames,
executionTime);
return response;
@@ -1551,10 +1551,10 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
- Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
-
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime,
attributes);
+
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime,
attributes);
return response;
}
@@ -1589,10 +1589,10 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
LOGGER.info("executionTime of create
updateAndCreateSubscriptionGroupList: {} is {} ms", groupNames, executionTime);
InvocationStatus status = response.getCode() ==
ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
- Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
-
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime,
attributes);
+
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime,
attributes);
}
return response;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 43b66b4c51..3725732537 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -122,13 +122,13 @@ public class DefaultPullMessageResultHandler implements
PullMessageResultHandler
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(),
getMessageResult.getMessageCount());
if
(!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())
.put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(requestHeader.getTopic()) ||
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
.build();
-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(),
attributes);
-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
attributes);
}
if (!channelIsWritable(channel, requestHeader)) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index 468a8791d4..153ac24c1f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.TopicFilterType;
@@ -149,12 +149,12 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// successful committed, then total num of
half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(),
-1);
- BrokerMetricsManager.commitMessagesTotal.add(1,
BrokerMetricsManager.newAttributesBuilder()
+
this.brokerController.getBrokerMetricsManager().getCommitMessagesTotal().add(1,
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
// record the commit latency.
Long commitLatency = (System.currentTimeMillis() -
result.getPrepareMessage().getBornTimestamp()) / 1000;
-
BrokerMetricsManager.transactionFinishLatency.record(commitLatency,
BrokerMetricsManager.newAttributesBuilder()
+
this.brokerController.getBrokerMetricsManager().getTransactionFinishLatency().record(commitLatency,
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
}
@@ -176,7 +176,7 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// roll back, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC),
-1);
- BrokerMetricsManager.rollBackMessagesTotal.add(1,
BrokerMetricsManager.newAttributesBuilder()
+
this.brokerController.getBrokerMetricsManager().getRollBackMessagesTotal().add(1,
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC,
result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
.build());
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 40117b74a5..584d248ab2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
@@ -255,13 +255,13 @@ public class PeekMessageProcessor implements
NettyRequestProcessor {
}
if (getMessageTmpResult != null) {
if (!getMessageTmpResult.getMessageMapedList().isEmpty() &&
!isRetry) {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())
.put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(requestHeader.getTopic()) ||
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
.build();
-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(),
attributes);
-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
attributes);
}
for (SelectMappedBufferResult mappedBuffer :
getMessageTmpResult.getMessageMapedList()) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 7d98705576..83ca35091e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -45,7 +45,7 @@ import org.apache.rocketmq.broker.longpolling.PollingHeader;
import org.apache.rocketmq.broker.longpolling.PollingResult;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.broker.longpolling.PopRequest;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.pop.PopConsumerContext;
import org.apache.rocketmq.common.BrokerConfig;
@@ -788,14 +788,14 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(),
topic,
result.getBufferTotalSize());
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP,
requestHeader.getConsumerGroup())
.put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(requestHeader.getTopic()) ||
MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
.put(LABEL_IS_RETRY, isRetry)
.build();
-
BrokerMetricsManager.messagesOutTotal.add(result.getMessageCount(), attributes);
-
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(result.getMessageCount(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(result.getBufferTotalSize(),
attributes);
if (isOrder) {
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(),
isRetry, topic,
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 2be41a69d6..ea7f177f39 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -31,7 +31,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -223,13 +223,13 @@ public class PopReviveService extends ServiceThread {
brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic,
queueId,
brokerController.getMessageStore().now() -
foundList.get(foundList.size() - 1).getStoreTimestamp());
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, topic)
.put(LABEL_CONSUMER_GROUP, group)
.put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
.build();
-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(),
attributes);
-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
attributes);
break;
case NO_MATCHED_MESSAGE:
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index a70b48debe..9b2bbc34e8 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -298,14 +298,14 @@ public class ReplyMessageProcessor extends
AbstractSendMessageProcessor {
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(),
putMessageResult.getAppendMessageResult().getMsgNum());
if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, msg.getTopic())
.put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
.put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(msg.getTopic()))
.build();
-
BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(),
attributes);
-
BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(),
attributes);
-
BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes()
/ putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+
this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(putMessageResult.getAppendMessageResult().getMsgNum(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(putMessageResult.getAppendMessageResult().getWroteBytes(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getMessageSize().record(putMessageResult.getAppendMessageResult().getWroteBytes()
/ putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
}
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 669cd5e677..6d60290a58 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -207,12 +207,12 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
if (reconsumeTimes > maxReconsumeTimes ||
sendRetryMessageToDeadLetterQueueDirectly) {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP,
requestHeader.getProducerGroup())
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_IS_SYSTEM,
BrokerMetricsManager.isSystem(requestHeader.getTopic(),
requestHeader.getProducerGroup()))
.build();
- BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+
this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1,
attributes);
properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
newTopic = MixAll.getDLQTopic(groupName);
@@ -468,14 +468,14 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
(int) (this.brokerController.getMessageStore().now() -
beginTimeMillis));
if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, msg.getTopic())
.put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
.put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(msg.getTopic()))
.build();
-
BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(),
attributes);
-
BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(),
attributes);
-
BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes()
/ putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+
this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(putMessageResult.getAppendMessageResult().getMsgNum(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(putMessageResult.getAppendMessageResult().getWroteBytes(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getMessageSize().record(putMessageResult.getAppendMessageResult().getWroteBytes()
/ putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
}
response.setRemark(null);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 25c24aff98..bec75fe2fb 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -722,26 +722,26 @@ public class ScheduleMessageService extends ConfigManager
{
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
result.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP,
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
result.getAppendMessageResult().getWroteBytes());
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)
.put(LABEL_CONSUMER_GROUP, MixAll.SCHEDULE_CONSUMER_GROUP)
.put(LABEL_IS_SYSTEM, true)
.build();
-
BrokerMetricsManager.messagesOutTotal.add(result.getAppendMessageResult().getMsgNum(),
attributes);
-
BrokerMetricsManager.throughputOutTotal.add(result.getAppendMessageResult().getWroteBytes(),
attributes);
+
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(result.getAppendMessageResult().getMsgNum(),
attributes);
+
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(result.getAppendMessageResult().getWroteBytes(),
attributes);
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic,
result.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic,
result.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic,
result.getAppendMessageResult().getMsgNum());
- attributes = BrokerMetricsManager.newAttributesBuilder()
+ attributes =
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, topic)
.put(LABEL_MESSAGE_TYPE,
TopicMessageType.DELAY.getMetricsValue())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
-
BrokerMetricsManager.messagesInTotal.add(result.getAppendMessageResult().getMsgNum(),
attributes);
-
BrokerMetricsManager.throughputInTotal.add(result.getAppendMessageResult().getWroteBytes(),
attributes);
-
BrokerMetricsManager.messageSize.record(result.getAppendMessageResult().getWroteBytes()
/ result.getAppendMessageResult().getMsgNum(), attributes);
+
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(result.getAppendMessageResult().getMsgNum(),
attributes);
+
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(result.getAppendMessageResult().getWroteBytes(),
attributes);
+
ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessageSize().record(result.getAppendMessageResult().getWroteBytes()
/ result.getAppendMessageResult().getMsgNum(), attributes);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
index 2383f4f917..5fcc1f56b7 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
@@ -27,7 +27,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.MixAll;
@@ -146,13 +146,13 @@ public class TransactionalMessageBridge {
this.brokerController.getMessageStore().now() -
foundList.get(foundList.size() - 1)
.getStoreTimestamp());
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder()
+ Attributes attributes =
this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, topic)
.put(LABEL_CONSUMER_GROUP, group)
.put(LABEL_IS_SYSTEM,
TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
.build();
-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(),
attributes);
-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(),
attributes);
+
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(),
attributes);
break;
case NO_MATCHED_MESSAGE:
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 789143234f..24a26b2350 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -75,6 +75,15 @@ public class BrokerControllerTest {
brokerController.shutdown();
}
+ @Test
+ public void testBrokerMetricsManagerInitialization() throws Exception {
+ BrokerController brokerController = new BrokerController(brokerConfig,
nettyServerConfig, new NettyClientConfig(), messageStoreConfig);
+ assertThat(brokerController.initialize()).isTrue();
+ // Verify that brokerMetricsManager is properly initialized and not
null
+ assertThat(brokerController.getBrokerMetricsManager()).isNotNull();
+ brokerController.shutdown();
+ }
+
@After
public void destroy() {
UtilAll.deleteFile(new File(messageStoreConfig.getStorePathRootDir()));
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
index 9264eb4b56..9e4cfa70c1 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java
@@ -43,16 +43,36 @@ import static org.assertj.core.api.Assertions.assertThat;
public class BrokerMetricsManagerTest {
+ private BrokerMetricsManager createTestBrokerMetricsManager() {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ String storePathRootDir = System.getProperty("java.io.tmpdir") +
File.separator + "store-"
+ + UUID.randomUUID();
+ messageStoreConfig.setStorePathRootDir(storePathRootDir);
+ BrokerConfig brokerConfig = new BrokerConfig();
+
+ NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ nettyServerConfig.setListenPort(0);
+
+ BrokerController brokerController = new BrokerController(brokerConfig,
nettyServerConfig,
+ new NettyClientConfig(), messageStoreConfig);
+
+ return new BrokerMetricsManager(brokerController);
+ }
+
@Test
public void testNewAttributesBuilder() {
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder().put("a", "b")
+ BrokerMetricsManager metricsManager = createTestBrokerMetricsManager();
+ Attributes attributes = metricsManager.newAttributesBuilder().put("a",
"b")
.build();
assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b");
}
@Test
public void testCustomizedAttributesBuilder() {
- BrokerMetricsManager.attributesBuilderSupplier = () -> new
AttributesBuilder() {
+ BrokerMetricsManager metricsManager = createTestBrokerMetricsManager();
+
+ // Create a custom attributes builder supplier for testing
+ metricsManager.setAttributesBuilderSupplier(() -> new
AttributesBuilder() {
private AttributesBuilder attributesBuilder = Attributes.builder();
@Override
@@ -77,8 +97,9 @@ public class BrokerMetricsManagerTest {
attributesBuilder.putAll(attributes);
return this;
}
- };
- Attributes attributes =
BrokerMetricsManager.newAttributesBuilder().put("a", "b")
+ });
+
+ Attributes attributes = metricsManager.newAttributesBuilder().put("a",
"b")
.build();
assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b");
assertThat(attributes.get(AttributeKey.stringKey("customized"))).isEqualTo("value");
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index f3d0eb0782..1bf99eadfb 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
@@ -238,6 +239,8 @@ public class AdminBrokerProcessorTest {
brokerController.setMessageStore(messageStore);
brokerController.setAuthenticationMetadataManager(authenticationMetadataManager);
brokerController.setAuthorizationMetadataManager(authorizationMetadataManager);
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ brokerController.setBrokerMetricsManager(new
BrokerMetricsManager(brokerController));
Field field = BrokerController.class.getDeclaredField("broker2Client");
field.setAccessible(true);
field.set(brokerController, broker2Client);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index e4360f147b..1751ad96fd 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.TransactionMetrics;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -83,6 +84,8 @@ public class EndTransactionProcessorTest {
when(transactionMsgService.getTransactionMetrics()).thenReturn(transactionMetrics);
brokerController.setMessageStore(messageStore);
brokerController.setTransactionalMessageService(transactionMsgService);
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ brokerController.setBrokerMetricsManager(new
BrokerMetricsManager(brokerController));
endTransactionProcessor = new
EndTransactionProcessor(brokerController);
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
index 9baf2a6ebb..5722b031b0 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
@@ -86,6 +87,8 @@ public class PeekMessageProcessorTest {
@Before
public void init() {
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ brokerController.setBrokerMetricsManager(new
BrokerMetricsManager(brokerController));
peekMessageProcessor = new PeekMessageProcessor(brokerController);
when(brokerController.getMessageStore()).thenReturn(messageStore);
topicConfigManager = new TopicConfigManager(brokerController);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index fdb0690e5d..59559d3cfd 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.TopicConfig;
@@ -78,6 +79,8 @@ public class PopMessageProcessorTest {
public void init() {
brokerController.setMessageStore(messageStore);
brokerController.getBrokerConfig().setEnablePopBufferMerge(true);
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ brokerController.setBrokerMetricsManager(new
BrokerMetricsManager(brokerController));
popMessageProcessor = new PopMessageProcessor(brokerController);
when(handlerContext.channel()).thenReturn(embeddedChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig(topic));
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index 83c3011185..cecd1ff86a 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -82,6 +83,8 @@ public class PullMessageProcessorTest {
@Before
public void init() {
brokerController.setMessageStore(messageStore);
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ brokerController.setBrokerMetricsManager(new
BrokerMetricsManager(brokerController));
SubscriptionGroupManager subscriptionGroupManager = new
SubscriptionGroupManager(brokerController);
pullMessageProcessor = new PullMessageProcessor(brokerController);
when(brokerController.getPullMessageProcessor()).thenReturn(pullMessageProcessor);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
index 266c8491cb..03af9b948b 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -81,6 +82,8 @@ public class ReplyMessageProcessorTest {
public void init() throws IllegalAccessException, NoSuchFieldException {
clientInfo = new ClientChannelInfo(channel, "127.0.0.1",
LanguageCode.JAVA, 0);
brokerController.setMessageStore(messageStore);
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ brokerController.setBrokerMetricsManager(new
BrokerMetricsManager(brokerController));
Field field = BrokerController.class.getDeclaredField("broker2Client");
field.setAccessible(true);
field.set(brokerController, broker2Client);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 9da6a96ec9..ce8b3405f6 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.apache.commons.codec.DecoderException;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -102,6 +103,8 @@ public class SendMessageProcessorTest {
@Before
public void init() {
brokerController.setMessageStore(messageStore);
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ brokerController.setBrokerMetricsManager(new
BrokerMetricsManager(brokerController));
TopicConfigManager topicConfigManager = new
TopicConfigManager(brokerController);
topicConfigManager.getTopicConfigTable().put(topic, new
TopicConfig(topic));
SubscriptionGroupManager subscriptionGroupManager = new
SubscriptionGroupManager(brokerController);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
index b90fb2931d..675c9a57c8 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.failover.EscapeBridge;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.util.HookUtils;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
@@ -48,6 +49,9 @@ import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import io.opentelemetry.api.common.Attributes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -132,10 +136,22 @@ public class ScheduleMessageServiceTest {
Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(manager);
EscapeBridge escapeBridge = new EscapeBridge(brokerController);
Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
- scheduleMessageService = new ScheduleMessageService(brokerController);
+ // Initialize BrokerMetricsManager to prevent NPE in tests
+ BrokerMetricsManager brokerMetricsManager =
Mockito.mock(BrokerMetricsManager.class);
+ // Mock newAttributesBuilder to return a valid AttributesBuilder
instead of null
+
Mockito.when(brokerMetricsManager.newAttributesBuilder()).thenReturn(Attributes.builder());
+ // Mock metrics getter methods to return Nop implementations to
prevent NPE
+ Mockito.when(brokerMetricsManager.getMessagesInTotal()).thenReturn(new
NopLongCounter());
+
Mockito.when(brokerMetricsManager.getMessagesOutTotal()).thenReturn(new
NopLongCounter());
+
Mockito.when(brokerMetricsManager.getThroughputInTotal()).thenReturn(new
NopLongCounter());
+
Mockito.when(brokerMetricsManager.getThroughputOutTotal()).thenReturn(new
NopLongCounter());
+ Mockito.when(brokerMetricsManager.getMessageSize()).thenReturn(new
NopLongHistogram());
+
Mockito.when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
+ scheduleMessageService = Mockito.spy(new
ScheduleMessageService(brokerController));
+ // Mock ScheduleMessageService before it's used in HookUtils
+
Mockito.when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
scheduleMessageService.load();
scheduleMessageService.start();
-
Mockito.when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
}
@Test
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
index 2b8dac5d8b..81db576e3d 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
@@ -81,7 +81,7 @@ public class ProxyMetricsManager implements StartAndShutdown {
LABEL_MAP.put(LABEL_CLUSTER_NAME, proxyConfig.getProxyClusterName());
LABEL_MAP.put(LABEL_NODE_ID, proxyConfig.getProxyName());
LABEL_MAP.put(LABEL_PROXY_MODE,
proxyConfig.getProxyMode().toLowerCase());
- initMetrics(brokerMetricsManager.getBrokerMeter(),
BrokerMetricsManager::newAttributesBuilder);
+ initMetrics(brokerMetricsManager.getBrokerMeter(),
brokerMetricsManager::newAttributesBuilder);
}
public static ProxyMetricsManager initClusterMode(ProxyConfig proxyConfig)
{
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 2bdd058f3f..4d13acf225 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -160,6 +160,7 @@ public class DefaultMessageStore implements MessageStore {
protected StoreCheckpoint storeCheckpoint;
private TimerMessageStore timerMessageStore;
+ private final DefaultStoreMetricsManager defaultStoreMetricsManager;
private final LinkedList<CommitLogDispatcher> dispatcherList = new
LinkedList<>();
@@ -237,6 +238,8 @@ public class DefaultMessageStore implements MessageStore {
this.transientStorePool = new
TransientStorePool(messageStoreConfig.getTransientStorePoolSize(),
messageStoreConfig.getMappedFileSizeCommitLog());
+ this.defaultStoreMetricsManager = new DefaultStoreMetricsManager();
+
this.scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -2970,12 +2973,12 @@ public class DefaultMessageStore implements
MessageStore {
@Override
public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
- return DefaultStoreMetricsManager.getMetricsView();
+ return this.defaultStoreMetricsManager.getMetricsView();
}
@Override
public void initMetrics(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier) {
- DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier,
this);
+ this.defaultStoreMetricsManager.init(meter, attributesBuilderSupplier,
this);
}
/**
@@ -3021,4 +3024,8 @@ public class DefaultMessageStore implements MessageStore {
this.notifyMessageArriveInBatch = notifyMessageArriveInBatch;
}
+ public DefaultStoreMetricsManager getDefaultStoreMetricsManager() {
+ return defaultStoreMetricsManager;
+ }
+
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index ef72de8baa..8d3963bb4a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -64,26 +64,29 @@ import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABE
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
public class DefaultStoreMetricsManager {
- public static Supplier<AttributesBuilder> attributesBuilderSupplier;
- public static MessageStoreConfig messageStoreConfig;
-
- public static ObservableLongGauge storageSize = new
NopObservableLongGauge();
- public static ObservableLongGauge flushBehind = new
NopObservableLongGauge();
- public static ObservableLongGauge dispatchBehind = new
NopObservableLongGauge();
- public static ObservableLongGauge messageReserveTime = new
NopObservableLongGauge();
-
- public static ObservableLongGauge timerEnqueueLag = new
NopObservableLongGauge();
- public static ObservableLongGauge timerEnqueueLatency = new
NopObservableLongGauge();
- public static ObservableLongGauge timerDequeueLag = new
NopObservableLongGauge();
- public static ObservableLongGauge timerDequeueLatency = new
NopObservableLongGauge();
- public static ObservableLongGauge timingMessages = new
NopObservableLongGauge();
-
- public static LongCounter timerDequeueTotal = new NopLongCounter();
- public static LongCounter timerEnqueueTotal = new NopLongCounter();
- public static ObservableLongGauge timerMessageSnapshot = new
NopObservableLongGauge();
- public static LongHistogram timerMessageSetLatency = new
NopLongHistogram();
-
- public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView()
{
+ private Supplier<AttributesBuilder> attributesBuilderSupplier;
+ private MessageStoreConfig messageStoreConfig;
+
+ private ObservableLongGauge storageSize = new NopObservableLongGauge();
+ private ObservableLongGauge flushBehind = new NopObservableLongGauge();
+ private ObservableLongGauge dispatchBehind = new NopObservableLongGauge();
+ private ObservableLongGauge messageReserveTime = new
NopObservableLongGauge();
+
+ private ObservableLongGauge timerEnqueueLag = new NopObservableLongGauge();
+ private ObservableLongGauge timerEnqueueLatency = new
NopObservableLongGauge();
+ private ObservableLongGauge timerDequeueLag = new NopObservableLongGauge();
+ private ObservableLongGauge timerDequeueLatency = new
NopObservableLongGauge();
+ private ObservableLongGauge timingMessages = new NopObservableLongGauge();
+
+ private LongCounter timerDequeueTotal = new NopLongCounter();
+ private LongCounter timerEnqueueTotal = new NopLongCounter();
+ private ObservableLongGauge timerMessageSnapshot = new
NopObservableLongGauge();
+ private LongHistogram timerMessageSetLatency = new NopLongHistogram();
+
+ public DefaultStoreMetricsManager() {
+ }
+
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
List<Double> rpcCostTimeBuckets = Arrays.asList(
// day * hour * min * second
1d * 1 * 1 * 60, // 60 second
@@ -102,42 +105,42 @@ public class DefaultStoreMetricsManager {
return Lists.newArrayList(new Pair<>(selector, viewBuilder));
}
- public static void init(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier,
+ public void init(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier,
DefaultMessageStore messageStore) {
// Also add some metrics for rocksdb's monitoring.
RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier,
messageStore.getQueueStore());
- DefaultStoreMetricsManager.attributesBuilderSupplier =
attributesBuilderSupplier;
- DefaultStoreMetricsManager.messageStoreConfig =
messageStore.getMessageStoreConfig();
+ this.attributesBuilderSupplier = attributesBuilderSupplier;
+ this.messageStoreConfig = messageStore.getMessageStoreConfig();
- storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
+ this.storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
.setDescription("Broker storage size")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> {
- File storeDir = new
File(messageStoreConfig.getStorePathRootDir());
+ File storeDir = new
File(this.messageStoreConfig.getStorePathRootDir());
if (storeDir.exists() && storeDir.isDirectory()) {
long totalSpace = storeDir.getTotalSpace();
if (totalSpace > 0) {
- measurement.record(totalSpace -
storeDir.getFreeSpace(), newAttributesBuilder().build());
+ measurement.record(totalSpace -
storeDir.getFreeSpace(), this.newAttributesBuilder().build());
}
}
});
- flushBehind = meter.gaugeBuilder(GAUGE_STORAGE_FLUSH_BEHIND)
+ this.flushBehind = meter.gaugeBuilder(GAUGE_STORAGE_FLUSH_BEHIND)
.setDescription("Broker flush behind bytes")
.setUnit("bytes")
.ofLongs()
- .buildWithCallback(measurement ->
measurement.record(messageStore.flushBehindBytes(),
newAttributesBuilder().build()));
+ .buildWithCallback(measurement ->
measurement.record(messageStore.flushBehindBytes(),
this.newAttributesBuilder().build()));
- dispatchBehind = meter.gaugeBuilder(GAUGE_STORAGE_DISPATCH_BEHIND)
+ this.dispatchBehind = meter.gaugeBuilder(GAUGE_STORAGE_DISPATCH_BEHIND)
.setDescription("Broker dispatch behind bytes")
.setUnit("bytes")
.ofLongs()
- .buildWithCallback(measurement ->
measurement.record(messageStore.dispatchBehindBytes(),
newAttributesBuilder().build()));
+ .buildWithCallback(measurement ->
measurement.record(messageStore.dispatchBehindBytes(),
this.newAttributesBuilder().build()));
- messageReserveTime =
meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
+ this.messageReserveTime =
meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
.setDescription("Broker message reserve time")
.setUnit("milliseconds")
.ofLongs()
@@ -146,42 +149,42 @@ public class DefaultStoreMetricsManager {
if (earliestMessageTime <= 0) {
return;
}
- measurement.record(System.currentTimeMillis() -
earliestMessageTime, newAttributesBuilder().build());
+ measurement.record(System.currentTimeMillis() -
earliestMessageTime, this.newAttributesBuilder().build());
});
if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
- timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+ this.timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
.setDescription("Timer enqueue messages lag")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
-
measurement.record(timerMessageStore.getEnqueueBehindMessages(),
newAttributesBuilder().build());
+
measurement.record(timerMessageStore.getEnqueueBehindMessages(),
this.newAttributesBuilder().build());
});
- timerEnqueueLatency =
meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+ this.timerEnqueueLatency =
meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
.setDescription("Timer enqueue latency")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
-
measurement.record(timerMessageStore.getEnqueueBehindMillis(),
newAttributesBuilder().build());
+
measurement.record(timerMessageStore.getEnqueueBehindMillis(),
this.newAttributesBuilder().build());
});
- timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+ this.timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
.setDescription("Timer dequeue messages lag")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
-
measurement.record(timerMessageStore.getDequeueBehindMessages(),
newAttributesBuilder().build());
+
measurement.record(timerMessageStore.getDequeueBehindMessages(),
this.newAttributesBuilder().build());
});
- timerDequeueLatency =
meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+ this.timerDequeueLatency =
meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
.setDescription("Timer dequeue latency")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
- measurement.record(timerMessageStore.getDequeueBehind(),
newAttributesBuilder().build());
+ measurement.record(timerMessageStore.getDequeueBehind(),
this.newAttributesBuilder().build());
});
- timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+ this.timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
.setDescription("Current message number in timing")
.ofLongs()
.buildWithCallback(measurement -> {
@@ -191,23 +194,23 @@ public class DefaultStoreMetricsManager {
.forEach((topic, metric) -> {
measurement.record(
metric.getCount().get(),
- newAttributesBuilder().put(LABEL_TOPIC,
topic).build()
+ this.newAttributesBuilder().put(LABEL_TOPIC,
topic).build()
);
});
});
- timerDequeueTotal =
meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+ this.timerDequeueTotal =
meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
.setDescription("Total number of timer dequeue")
.build();
- timerEnqueueTotal =
meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+ this.timerEnqueueTotal =
meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
.setDescription("Total number of timer enqueue")
.build();
- timerMessageSnapshot =
meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT)
+ this.timerMessageSnapshot =
meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT)
.setDescription("Timer message distribution snapshot, only
count timing messages in 24h.")
.ofLongs()
.buildWithCallback(measurement -> {
TimerMetrics timerMetrics =
messageStore.getTimerMessageStore().getTimerMetrics();
TimerWheel timerWheel =
messageStore.getTimerMessageStore().getTimerWheel();
- int precisionMs = messageStoreConfig.getTimerPrecisionMs();
+ int precisionMs =
this.messageStoreConfig.getTimerPrecisionMs();
List<Integer> timerDist = timerMetrics.getTimerDistList();
long currTime = System.currentTimeMillis() / precisionMs *
precisionMs;
for (int i = 0; i < timerDist.size(); i++) {
@@ -218,10 +221,10 @@ public class DefaultStoreMetricsManager {
Slot slotEach = timerWheel.getSlot(currTime +
(long) j * precisionMs);
periodTotal += slotEach.num;
}
- measurement.record(periodTotal,
newAttributesBuilder().put(LABEL_TIMING_BOUND,
timerDist.get(i).toString()).build());
+ measurement.record(periodTotal,
this.newAttributesBuilder().put(LABEL_TIMING_BOUND,
timerDist.get(i).toString()).build());
}
});
- timerMessageSetLatency =
meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY)
+ this.timerMessageSetLatency =
meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY)
.setDescription("Timer message set latency distribution")
.setUnit("seconds")
.ofLongs()
@@ -229,26 +232,96 @@ public class DefaultStoreMetricsManager {
}
}
- public static void incTimerDequeueCount(String topic) {
- timerDequeueTotal.add(1, newAttributesBuilder()
+ public void incTimerDequeueCount(String topic) {
+ this.timerDequeueTotal.add(1, this.newAttributesBuilder()
.put(LABEL_TOPIC, topic)
.build());
}
- public static void incTimerEnqueueCount(String topic) {
- AttributesBuilder attributesBuilder = newAttributesBuilder();
+ public void incTimerEnqueueCount(String topic) {
+ AttributesBuilder attributesBuilder = this.newAttributesBuilder();
if (topic != null) {
attributesBuilder.put(LABEL_TOPIC, topic);
}
- timerEnqueueTotal.add(1, attributesBuilder.build());
+ this.timerEnqueueTotal.add(1, attributesBuilder.build());
}
- public static AttributesBuilder newAttributesBuilder() {
- if (attributesBuilderSupplier == null) {
+ public AttributesBuilder newAttributesBuilder() {
+ if (this.attributesBuilderSupplier == null) {
return Attributes.builder();
}
- return attributesBuilderSupplier.get()
+ return this.attributesBuilderSupplier.get()
.put(LABEL_STORAGE_TYPE, DEFAULT_STORAGE_TYPE)
.put(LABEL_STORAGE_MEDIUM, DEFAULT_STORAGE_MEDIUM);
}
+
+ // Getter methods for external access
+ public Supplier<AttributesBuilder> getAttributesBuilderSupplier() {
+ return attributesBuilderSupplier;
+ }
+
+ public MessageStoreConfig getMessageStoreConfig() {
+ return messageStoreConfig;
+ }
+
+ public ObservableLongGauge getStorageSize() {
+ return storageSize;
+ }
+
+ public ObservableLongGauge getFlushBehind() {
+ return flushBehind;
+ }
+
+ public ObservableLongGauge getDispatchBehind() {
+ return dispatchBehind;
+ }
+
+ public ObservableLongGauge getMessageReserveTime() {
+ return messageReserveTime;
+ }
+
+ public ObservableLongGauge getTimerEnqueueLag() {
+ return timerEnqueueLag;
+ }
+
+ public ObservableLongGauge getTimerEnqueueLatency() {
+ return timerEnqueueLatency;
+ }
+
+ public ObservableLongGauge getTimerDequeueLag() {
+ return timerDequeueLag;
+ }
+
+ public ObservableLongGauge getTimerDequeueLatency() {
+ return timerDequeueLatency;
+ }
+
+ public ObservableLongGauge getTimingMessages() {
+ return timingMessages;
+ }
+
+ public LongCounter getTimerDequeueTotal() {
+ return timerDequeueTotal;
+ }
+
+ public LongCounter getTimerEnqueueTotal() {
+ return timerEnqueueTotal;
+ }
+
+ public ObservableLongGauge getTimerMessageSnapshot() {
+ return timerMessageSnapshot;
+ }
+
+ public LongHistogram getTimerMessageSetLatency() {
+ return timerMessageSetLatency;
+ }
+
+ // Setter methods for testing
+ public void setAttributesBuilderSupplier(Supplier<AttributesBuilder>
attributesBuilderSupplier) {
+ this.attributesBuilderSupplier = attributesBuilderSupplier;
+ }
+
+ public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
+ this.messageStoreConfig = messageStoreConfig;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index d6af7b84e7..1f51a063d6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -701,9 +701,13 @@ public class TimerMessageStore {
return false;
}
}
- Attributes attributes =
DefaultStoreMetricsManager.newAttributesBuilder()
+ // Record timer message set latency
+ if (messageStore instanceof DefaultMessageStore) {
+ DefaultStoreMetricsManager metricsManager =
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager();
+ Attributes attributes =
metricsManager.newAttributesBuilder()
.put(DefaultStoreMetricsConstant.LABEL_TOPIC,
msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
-
DefaultStoreMetricsManager.timerMessageSetLatency.record((delayedTime -
msgExt.getBornTimestamp()) / 1000, attributes);
+
metricsManager.getTimerMessageSetLatency().record((delayedTime -
msgExt.getBornTimestamp()) / 1000, attributes);
+ }
}
} catch (Exception e) {
// here may cause the message loss
@@ -1366,7 +1370,9 @@ public class TimerMessageStore {
protected void putMessageToTimerWheel(TimerRequest req) {
try {
perfCounterTicks.startTick(ENQUEUE_PUT);
-
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
+ if (messageStore instanceof DefaultMessageStore) {
+ ((DefaultMessageStore)
messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg()));
+ }
if (shouldRunningDequeue && req.getDelayTime() <
currWriteTimeMs) {
req.setEnqueueTime(Long.MAX_VALUE);
dequeuePutQueue.put(req);
@@ -1502,7 +1508,9 @@ public class TimerMessageStore {
perfCounterTicks.startTick(DEQUEUE_PUT);
MessageExt msgExt = tr.getMsg();
-
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
+ if (messageStore instanceof
DefaultMessageStore) {
+ ((DefaultMessageStore)
messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt));
+ }
if (tr.getEnqueueTime() == Long.MAX_VALUE) {
// Never enqueue, mark it.