This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6d7513425c [RIP-46] Enhanced metrics for timing and transactional 
messages (#7500)
6d7513425c is described below

commit 6d7513425c2aeb17e527be9d0d98d47f7251927d
Author: Ji Juntao <juntao....@alibaba-inc.com>
AuthorDate: Mon Jan 22 16:56:05 2024 +0800

    [RIP-46] Enhanced metrics for timing and transactional messages (#7500)
    
    * add request codes' distribution and timing messages' distribution
    
    * remove the requestCode distribution.
    
    * add delay message latency distribution.
    
    * add transaction metrics
    
    * transaction metric of topics finished, v1.
    
    * add the transaction metrics, to be tested.
    
    * fix the judgement of putMessageResult
    
    * optimize.
    
    * add config.
    
    * fix test case.
    
    * add unit tests for transactionMetrics.
    
    * remove chinese character
    
    * add rocksdb metrics.
    
    * add more rocksdb metrics.
    
    * fix NPE
    
    * avoid the total time is 0.
    
    * add license
    
    * remove useless import.
---
 .../apache/rocketmq/broker/BrokerController.java   |   9 +
 .../rocketmq/broker/BrokerPathConfigHelper.java    |   3 +
 .../broker/metrics/BrokerMetricsConstant.java      |   5 +
 .../broker/metrics/BrokerMetricsManager.java       |  72 +++++-
 .../broker/processor/EndTransactionProcessor.java  |  18 ++
 .../broker/processor/SendMessageProcessor.java     |  26 ++-
 .../broker/transaction/TransactionMetrics.java     | 259 +++++++++++++++++++++
 .../TransactionMetricsFlushService.java            |  55 +++++
 .../transaction/TransactionalMessageService.java   |   5 +
 .../DefaultTransactionalMessageCheckListener.java  |   2 +
 .../queue/TransactionalMessageServiceImpl.java     |  19 ++
 .../processor/EndTransactionProcessorTest.java     |   5 +
 .../transaction/queue/TransactionMetricsTest.java  |  83 +++++++
 .../util/TransactionalMessageServiceImpl.java      |  11 +
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../org/apache/rocketmq/common/ConfigManager.java  |  11 +-
 .../common/config/RocksDBConfigManager.java        |  13 +-
 .../common/metrics/NopObservableDoubleGauge.java   |  22 ++
 .../remoting/metrics/RemotingMetricsConstant.java  |   1 -
 .../apache/rocketmq/store/RocksDBMessageStore.java |  12 +
 .../store/metrics/DefaultStoreMetricsConstant.java |  12 +
 .../store/metrics/DefaultStoreMetricsManager.java  |  63 ++++-
 .../store/metrics/RocksDBStoreMetricsManager.java  | 154 ++++++++++++
 .../store/queue/RocksDBConsumeQueueStore.java      |   4 +
 .../rocketmq/store/timer/TimerMessageStore.java    |   5 +
 25 files changed, 856 insertions(+), 23 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8d29d44383..af90e5f87e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -104,6 +104,7 @@ import 
org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
 import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
 import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
 import 
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
 import 
org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
@@ -277,6 +278,7 @@ public class BrokerController {
     private BrokerMetricsManager brokerMetricsManager;
     private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
     private ColdDataCgCtrService coldDataCgCtrService;
+    private TransactionMetricsFlushService transactionMetricsFlushService;
 
     public BrokerController(
         final BrokerConfig brokerConfig,
@@ -963,6 +965,9 @@ public class BrokerController {
         }
         this.transactionalMessageCheckListener.setBrokerController(this);
         this.transactionalMessageCheckService = new 
TransactionalMessageCheckService(this);
+        this.transactionMetricsFlushService = new 
TransactionMetricsFlushService(this);
+        this.transactionMetricsFlushService.start();
+
     }
 
     private void initialAcl() {
@@ -1440,6 +1445,10 @@ public class BrokerController {
             this.endTransactionExecutor.shutdown();
         }
 
+        if (this.transactionMetricsFlushService != null) {
+            this.transactionMetricsFlushService.shutdown();
+        }
+
         if (this.escapeBridge != null) {
             escapeBridge.shutdown();
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index cea321ef78..0b2f52f32e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -60,6 +60,9 @@ public class BrokerPathConfigHelper {
     public static String getTimerMetricsPath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + 
"timermetrics";
     }
+    public static String getTransactionMetricsPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + 
"transactionMetrics";
+    }
 
     public static String getConsumerFilterPath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + 
"consumerFilter.json";
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index 73b40f6ba5..5733aa40ba 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -38,6 +38,11 @@ public class BrokerMetricsConstant {
     public static final String GAUGE_CONSUMER_READY_MESSAGES = 
"rocketmq_consumer_ready_messages";
     public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = 
"rocketmq_send_to_dlq_messages_total";
 
+    public static final String COUNTER_COMMIT_MESSAGES_TOTAL = 
"rocketmq_commit_messages_total";
+    public static final String COUNTER_ROLLBACK_MESSAGES_TOTAL = 
"rocketmq_rollback_messages_total";
+    public static final String HISTOGRAM_FINISH_MSG_LATENCY = 
"rocketmq_finish_message_latency";
+    public static final String GAUGE_HALF_MESSAGES = "rocketmq_half_messages";
+
     public static final String LABEL_CLUSTER_NAME = "cluster";
     public static final String LABEL_NODE_TYPE = "node_type";
     public static final String NODE_TYPE_BROKER = "broker";
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 307fc02ef0..fc7e97bda9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -40,13 +40,6 @@ import io.opentelemetry.sdk.metrics.export.MetricExporter;
 import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
 import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
 import io.opentelemetry.sdk.resources.Resource;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -68,12 +61,23 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_COMMIT_MESSAGES_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_ROLLBACK_MESSAGES_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
@@ -83,8 +87,10 @@ import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CON
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_HALF_MESSAGES;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
@@ -141,6 +147,10 @@ public class BrokerMetricsManager {
     public static ObservableLongGauge consumerQueueingLatency = new 
NopObservableLongGauge();
     public static ObservableLongGauge consumerReadyMessages = new 
NopObservableLongGauge();
     public static LongCounter sendToDlqMessages = new NopLongCounter();
+    public static ObservableLongGauge halfMessages = new 
NopObservableLongGauge();
+    public static LongCounter commitMessagesTotal = new NopLongCounter();
+    public static LongCounter rollBackMessagesTotal = new NopLongCounter();
+    public static LongHistogram transactionFinishLatency = new 
NopLongHistogram();
 
     public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new 
ArrayList<String>() {
         {
@@ -348,6 +358,7 @@ public class BrokerMetricsManager {
         initRequestMetrics();
         initConnectionMetrics();
         initLagAndDlqMetrics();
+        initTransactionMetrics();
         initOtherMetrics();
     }
 
@@ -361,6 +372,15 @@ public class BrokerMetricsManager {
             2d * 1024 * 1024, //2MB
             4d * 1024 * 1024 //4MB
         );
+
+        List<Double> commitLatencyBuckets = Arrays.asList(
+                1d * 1 * 1 * 5, //5s
+                1d * 1 * 1 * 60, //1min
+                1d * 1 * 10 * 60, //10min
+                1d * 1 * 60 * 60, //1h
+                1d * 12 * 60 * 60, //12h
+                1d * 24 * 60 * 60 //24h
+        );
         InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
             .setType(InstrumentType.HISTOGRAM)
             .setName(HISTOGRAM_MESSAGE_SIZE)
@@ -371,6 +391,16 @@ public class BrokerMetricsManager {
         SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, 
brokerConfig.getMetricsOtelCardinalityLimit());
         providerBuilder.registerView(messageSizeSelector, 
messageSizeViewBuilder.build());
 
+        InstrumentSelector commitLatencySelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_FINISH_MSG_LATENCY)
+            .build();
+        ViewBuilder commitLatencyViewBuilder = View.builder()
+            
.setAggregation(Aggregation.explicitBucketHistogram(commitLatencyBuckets));
+        // To config the cardinalityLimit for openTelemetry metrics exporting.
+        SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, 
brokerConfig.getMetricsOtelCardinalityLimit());
+        providerBuilder.registerView(commitLatencySelector, 
commitLatencyViewBuilder.build());
+
         for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : 
RemotingMetricsManager.getMetricsView()) {
             ViewBuilder viewBuilder = selectorViewPair.getObject2();
             SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, 
brokerConfig.getMetricsOtelCardinalityLimit());
@@ -560,6 +590,34 @@ public class BrokerMetricsManager {
             .build();
     }
 
+    private void initTransactionMetrics() {
+        commitMessagesTotal = 
brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
+                .setDescription("Total number of commit messages")
+                .build();
+
+        rollBackMessagesTotal = 
brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
+                .setDescription("Total number of rollback messages")
+                .build();
+
+        transactionFinishLatency = 
brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
+                .setDescription("Transaction finish latency")
+                .ofLongs()
+                .setUnit("ms")
+                .build();
+
+        halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
+                .setDescription("Half messages of all topics")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    
brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
+                            .forEach((topic, metric) -> {
+                                measurement.record(
+                                        metric.getCount().get(),
+                                        
newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, 
topic).build()
+                                );
+                            });
+                });
+    }
     private void initOtherMetrics() {
         RemotingMetricsManager.initMetrics(brokerMeter, 
BrokerMetricsManager::newAttributesBuilder);
         messageStore.initMetrics(brokerMeter, 
BrokerMetricsManager::newAttributesBuilder);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index f6aa0d48c9..e812a53ba7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.transaction.OperationResult;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.TopicFilterType;
@@ -40,6 +41,8 @@ import 
org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 /**
  * EndTransaction processor: process commit and rollback message
  */
@@ -144,6 +147,16 @@ public class EndTransactionProcessor implements 
NettyRequestProcessor {
                     RemotingCommand sendResult = sendFinalMessage(msgInner);
                     if (sendResult.getCode() == ResponseCode.SUCCESS) {
                         
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
+                        // successful committed, then total num of 
half-messages minus 1
+                        
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(),
 -1);
+                        BrokerMetricsManager.commitMessagesTotal.add(1, 
BrokerMetricsManager.newAttributesBuilder()
+                                .put(LABEL_TOPIC, msgInner.getTopic())
+                                .build());
+                        // record the commit latency.
+                        Long commitLatency = (System.currentTimeMillis() - 
result.getPrepareMessage().getBornTimestamp()) / 1000;
+                        
BrokerMetricsManager.transactionFinishLatency.record(commitLatency, 
BrokerMetricsManager.newAttributesBuilder()
+                                .put(LABEL_TOPIC, msgInner.getTopic())
+                                .build());
                     }
                     return sendResult;
                 }
@@ -161,6 +174,11 @@ public class EndTransactionProcessor implements 
NettyRequestProcessor {
                 RemotingCommand res = 
checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                 if (res.getCode() == ResponseCode.SUCCESS) {
                     
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
+                    // roll back, then total num of half-messages minus 1
+                    
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC),
 -1);
+                    BrokerMetricsManager.rollBackMessagesTotal.add(1, 
BrokerMetricsManager.newAttributesBuilder()
+                            .put(LABEL_TOPIC, 
result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
+                            .build());
                 }
                 return res;
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 4ec84c1461..912d502eab 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -18,11 +18,6 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.opentelemetry.api.common.Attributes;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
@@ -65,10 +60,17 @@ import org.apache.rocketmq.store.AppendMessageResult;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
@@ -300,7 +302,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
 
         // Map<String, String> oriProps = 
MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String traFlag = 
oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-        boolean sendTransactionPrepareMessage = false;
+        boolean sendTransactionPrepareMessage;
         if (Boolean.parseBoolean(traFlag)
             && !(msgInner.getReconsumeTimes() > 0 && 
msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
             if 
(this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
@@ -311,6 +313,8 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 return response;
             }
             sendTransactionPrepareMessage = true;
+        } else {
+            sendTransactionPrepareMessage = false;
         }
 
         long beginTimeMillis = this.brokerController.getMessageStore().now();
@@ -332,6 +336,12 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 if (responseFuture != null) {
                     doResponse(ctx, request, responseFuture);
                 }
+
+                // record the transaction metrics, responseFuture == null 
means put successfully
+                if (sendTransactionPrepareMessage && (responseFuture == null 
|| responseFuture.getCode() == ResponseCode.SUCCESS)) {
+                    
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC),
 1);
+                }
+
                 sendMessageCallback.onComplete(sendMessageContext, response);
             }, this.brokerController.getPutMessageFutureExecutor());
             // Returns null to release the send message thread
@@ -344,6 +354,10 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
             }
             handlePutMessageResult(putMessageResult, response, request, 
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, 
mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
+            // record the transaction metrics
+            if (putMessageResult.getPutMessageStatus() == 
PutMessageStatus.PUT_OK && putMessageResult.getAppendMessageResult().isOk()) {
+                
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC),
 1);
+            }
             sendMessageCallback.onComplete(sendMessageContext, response);
             return response;
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
new file mode 100644
index 0000000000..ad30c73c60
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.transaction;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.google.common.io.Files;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class TransactionMetrics extends ConfigManager {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private ConcurrentMap<String, Metric> transactionCounts =
+            new ConcurrentHashMap<>(1024);
+
+    private DataVersion dataVersion = new DataVersion();
+
+    private final String configPath;
+
+    public TransactionMetrics(String configPath) {
+        this.configPath = configPath;
+    }
+
+    public long addAndGet(String topic, int value) {
+        Metric pair = getTopicPair(topic);
+        getDataVersion().nextVersion();
+        pair.setTimeStamp(System.currentTimeMillis());
+        return pair.getCount().addAndGet(value);
+    }
+
+    public Metric getTopicPair(String topic) {
+        Metric pair = transactionCounts.get(topic);
+        if (null != pair) {
+            return pair;
+        }
+        pair = new Metric();
+        final Metric previous = transactionCounts.putIfAbsent(topic, pair);
+        if (null != previous) {
+            return previous;
+        }
+        return pair;
+    }
+    public long getTransactionCount(String topic) {
+        Metric pair = transactionCounts.get(topic);
+        if (null == pair) {
+            return 0;
+        } else {
+            return pair.getCount().get();
+        }
+    }
+
+    public Map<String, Metric> getTransactionCounts() {
+        return transactionCounts;
+    }
+    public void setTransactionCounts(ConcurrentMap<String, Metric> 
transactionCounts) {
+        this.transactionCounts = transactionCounts;
+    }
+
+    protected void write0(Writer writer) {
+        TransactionMetricsSerializeWrapper wrapper = new 
TransactionMetricsSerializeWrapper();
+        wrapper.setTransactionCount(transactionCounts);
+        wrapper.setDataVersion(dataVersion);
+        JSON.writeJSONString(writer, wrapper, 
SerializerFeature.BrowserCompatible);
+    }
+
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        return configPath;
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            TransactionMetricsSerializeWrapper 
transactionMetricsSerializeWrapper =
+                    TransactionMetricsSerializeWrapper.fromJson(jsonString, 
TransactionMetricsSerializeWrapper.class);
+            if (transactionMetricsSerializeWrapper != null) {
+                
this.transactionCounts.putAll(transactionMetricsSerializeWrapper.getTransactionCount());
+                
this.dataVersion.assignNewOne(transactionMetricsSerializeWrapper.getDataVersion());
+            }
+        }
+    }
+
+    @Override
+    public String encode(boolean prettyFormat) {
+        TransactionMetricsSerializeWrapper metricsSerializeWrapper = new 
TransactionMetricsSerializeWrapper();
+        metricsSerializeWrapper.setDataVersion(this.dataVersion);
+        metricsSerializeWrapper.setTransactionCount(this.transactionCounts);
+        return metricsSerializeWrapper.toJson(prettyFormat);
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+
+    public void cleanMetrics(Set<String> topics) {
+        if (topics == null || topics.isEmpty()) {
+            return;
+        }
+        Iterator<Map.Entry<String, Metric>> iterator = 
transactionCounts.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<String, Metric> entry = iterator.next();
+            final String topic = entry.getKey();
+            if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)) {
+                continue;
+            }
+            if (!topics.contains(topic)) {
+                continue;
+            }
+            // in the input topics set, then remove it.
+            iterator.remove();
+        }
+    }
+
+    public static class TransactionMetricsSerializeWrapper extends 
RemotingSerializable {
+        private ConcurrentMap<String, Metric> transactionCount =
+                new ConcurrentHashMap<>(1024);
+        private DataVersion dataVersion = new DataVersion();
+
+        public ConcurrentMap<String, Metric> getTransactionCount() {
+            return transactionCount;
+        }
+
+        public void setTransactionCount(
+                ConcurrentMap<String, Metric> transactionCount) {
+            this.transactionCount = transactionCount;
+        }
+
+        public DataVersion getDataVersion() {
+            return dataVersion;
+        }
+
+        public void setDataVersion(DataVersion dataVersion) {
+            this.dataVersion = dataVersion;
+        }
+    }
+
+    @Override
+    public synchronized void persist() {
+        String config = configFilePath();
+        String temp = config + ".tmp";
+        String backup = config + ".bak";
+        BufferedWriter bufferedWriter = null;
+        try {
+            File tmpFile = new File(temp);
+            File parentDirectory = tmpFile.getParentFile();
+            if (!parentDirectory.exists()) {
+                if (!parentDirectory.mkdirs()) {
+                    log.error("Failed to create directory: {}", 
parentDirectory.getCanonicalPath());
+                    return;
+                }
+            }
+
+            if (!tmpFile.exists()) {
+                if (!tmpFile.createNewFile()) {
+                    log.error("Failed to create file: {}", 
tmpFile.getCanonicalPath());
+                    return;
+                }
+            }
+            bufferedWriter = new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(tmpFile, false),
+                    StandardCharsets.UTF_8));
+            write0(bufferedWriter);
+            bufferedWriter.flush();
+            bufferedWriter.close();
+            log.debug("Finished writing tmp file: {}", temp);
+
+            File configFile = new File(config);
+            if (configFile.exists()) {
+                Files.copy(configFile, new File(backup));
+                configFile.delete();
+            }
+
+            tmpFile.renameTo(configFile);
+        } catch (IOException e) {
+            log.error("Failed to persist {}", temp, e);
+        } finally {
+            if (null != bufferedWriter) {
+                try {
+                    bufferedWriter.close();
+                } catch (IOException ignore) {
+                }
+            }
+        }
+    }
+
+    public static class Metric {
+        private AtomicLong count;
+        private long timeStamp;
+
+        public Metric() {
+            count = new AtomicLong(0);
+            timeStamp = System.currentTimeMillis();
+        }
+
+        public AtomicLong getCount() {
+            return count;
+        }
+
+        public void setCount(AtomicLong count) {
+            this.count = count;
+        }
+
+        public long getTimeStamp() {
+            return timeStamp;
+        }
+
+        public void setTimeStamp(long timeStamp) {
+            this.timeStamp = timeStamp;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("[%d,%d]", count.get(), timeStamp);
+        }
+    }
+
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java
new file mode 100644
index 0000000000..948f9fbc8e
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class TransactionMetricsFlushService extends ServiceThread {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+    private BrokerController brokerController;
+    public TransactionMetricsFlushService(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public String getServiceName() {
+        return "TransactionFlushService";
+    }
+
+    @Override
+    public void run() {
+        log.info(this.getServiceName() + " service start");
+        long start = System.currentTimeMillis();
+        while (!this.isStopped()) {
+            try {
+                if (System.currentTimeMillis() - start > 
brokerController.getBrokerConfig().getTransactionMetricFlushInterval()) {
+                    start = System.currentTimeMillis();
+                    
brokerController.getTransactionalMessageService().getTransactionMetrics().persist();
+                    
waitForRunning(brokerController.getBrokerConfig().getTransactionMetricFlushInterval());
+                }
+            } catch (Throwable e) {
+                log.error("Error occurred in " + getServiceName(), e);
+            }
+        }
+        log.info(this.getServiceName() + " service end");
+    }
+}
\ No newline at end of file
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
index 8dbbf980eb..849e64024b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.transaction;
 
 import java.util.concurrent.CompletableFuture;
+
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import 
org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
@@ -87,4 +88,8 @@ public interface TransactionalMessageService {
      * Close transaction service.
      */
     void close();
+
+    TransactionMetrics getTransactionMetrics();
+
+    void setTransactionMetrics(TransactionMetrics transactionMetrics);
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
index ad02ae4270..8e2b679b40 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
@@ -49,6 +49,8 @@ public class DefaultTransactionalMessageCheckListener extends 
AbstractTransactio
             if (putMessageResult != null && 
putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                 log.info("Put checked-too-many-time half message to 
TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " +
                     "commitLogOffset={}, real topic={}", 
msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), 
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
+                // discarded, then the num of half-messages minus 1
+                
this.getBrokerController().getTransactionalMessageService().getTransactionMetrics().addAndGet(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
 -1);
             } else {
                 log.error("Put checked-too-many-time half message to 
TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), 
msgExt.getMsgId());
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index 48db828e0a..9fdfd0a710 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -27,8 +27,11 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import 
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
 import org.apache.rocketmq.broker.transaction.OperationResult;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -70,10 +73,25 @@ public class TransactionalMessageServiceImpl implements 
TransactionalMessageServ
 
     private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new 
ConcurrentHashMap<>();
 
+    private TransactionMetrics transactionMetrics;
+
     public TransactionalMessageServiceImpl(TransactionalMessageBridge 
transactionBridge) {
         this.transactionalMessageBridge = transactionBridge;
         transactionalOpBatchService = new 
TransactionalOpBatchService(transactionalMessageBridge.getBrokerController(), 
this);
         transactionalOpBatchService.start();
+        transactionMetrics = new 
TransactionMetrics(BrokerPathConfigHelper.getTransactionMetricsPath(
+                
transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getStorePathRootDir()));
+        transactionMetrics.load();
+    }
+
+    @Override
+    public TransactionMetrics getTransactionMetrics() {
+        return transactionMetrics;
+    }
+
+    @Override
+    public void setTransactionMetrics(TransactionMetrics transactionMetrics) {
+        this.transactionMetrics = transactionMetrics;
     }
 
 
@@ -632,6 +650,7 @@ public class TransactionalMessageServiceImpl implements 
TransactionalMessageServ
         if (this.transactionalOpBatchService != null) {
             this.transactionalOpBatchService.shutdown();
         }
+        this.getTransactionMetrics().persist();
     }
 
     public Message getOpMessage(int queueId, String moreData) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index 72b339ae73..a364a1bbee 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.transaction.OperationResult;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -71,8 +72,12 @@ public class EndTransactionProcessorTest {
     @Mock
     private TransactionalMessageService transactionMsgService;
 
+    @Mock
+    private TransactionMetrics transactionMetrics;
+
     @Before
     public void init() {
+        
when(transactionMsgService.getTransactionMetrics()).thenReturn(transactionMetrics);
         brokerController.setMessageStore(messageStore);
         brokerController.setTransactionalMessageService(transactionMsgService);
         endTransactionProcessor = new 
EndTransactionProcessor(brokerController);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
new file mode 100644
index 0000000000..690b4eabb5
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction.queue;
+
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Collections;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransactionMetricsTest {
+    private TransactionMetrics transactionMetrics;
+    private String configPath;
+
+    @Before
+    public void setUp() throws Exception {
+        configPath = "configPath";
+        transactionMetrics = new TransactionMetrics(configPath);
+    }
+
+    /**
+     * test addAndGet method
+     */
+    @Test
+    public void testAddAndGet() {
+        String topic = "testAddAndGet";
+        int value = 10;
+        long result = transactionMetrics.addAndGet(topic, value);
+
+        assert result == value;
+    }
+
+    @Test
+    public void testGetTopicPair() {
+        String topic = "getTopicPair";
+        Metric result = transactionMetrics.getTopicPair(topic);
+        assert result != null;
+    }
+
+    @Test
+    public void testGetTransactionCount() {
+        String topicExist = "topicExist";
+        String topicNotExist = "topicNotExist";
+
+        transactionMetrics.addAndGet(topicExist, 10);
+
+        assert transactionMetrics.getTransactionCount(topicExist) == 10;
+        assert transactionMetrics.getTransactionCount(topicNotExist) == 0;
+    }
+
+
+    /**
+     * test clean metrics
+     */
+    @Test
+    public void testCleanMetrics() {
+        String topic = "testCleanMetrics";
+        int value = 10;
+        assert transactionMetrics.addAndGet(topic, value) == value;
+        transactionMetrics.cleanMetrics(Collections.singleton(topic));
+        assert transactionMetrics.getTransactionCount(topic) == 0;
+    }
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
index 3cbfab2c27..2de4c307e0 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util;
 import java.util.concurrent.CompletableFuture;
 import 
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
 import org.apache.rocketmq.broker.transaction.OperationResult;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -70,4 +71,14 @@ public class TransactionalMessageServiceImpl implements 
TransactionalMessageServ
     public void close() {
 
     }
+
+    @Override
+    public TransactionMetrics getTransactionMetrics() {
+        return null;
+    }
+
+    @Override
+    public void setTransactionMetrics(TransactionMetrics transactionMetrics) {
+
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index bedc7f386b..0a2c528f86 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -269,6 +269,8 @@ public class BrokerConfig extends BrokerIdentity {
     @ImportantField
     private long transactionCheckInterval = 30 * 1000;
 
+    private long transactionMetricFlushInterval = 3 * 1000;
+
     /**
      * transaction batch op message
      */
@@ -1789,4 +1791,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setSplitRegistrationSize(int splitRegistrationSize) {
         this.splitRegistrationSize = splitRegistrationSize;
     }
+
+    public long getTransactionMetricFlushInterval() {
+        return transactionMetricFlushInterval;
+    }
+
+    public void setTransactionMetricFlushInterval(long 
transactionMetricFlushInterval) {
+        this.transactionMetricFlushInterval = transactionMetricFlushInterval;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java 
b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
index 6c3bed47cf..5e99759619 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
@@ -16,13 +16,14 @@
  */
 package org.apache.rocketmq.common;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.rocketmq.common.config.RocksDBConfigManager;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.rocksdb.Statistics;
+
+import java.io.IOException;
+import java.util.Map;
 
 public abstract class ConfigManager {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@@ -103,4 +104,8 @@ public abstract class ConfigManager {
     public abstract String encode(final boolean prettyFormat);
 
     public abstract void decode(final String jsonString);
+
+    public Statistics getStatistics() {
+        return rocksDBConfigManager == null ? null : 
rocksDBConfigManager.getStatistics();
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
index f958bbdf0b..d1ec894685 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
@@ -16,15 +16,16 @@
  */
 package org.apache.rocketmq.common.config;
 
-import java.util.function.BiConsumer;
-
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.rocksdb.FlushOptions;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
 import org.rocksdb.WriteBatch;
 
+import java.util.function.BiConsumer;
+
 public class RocksDBConfigManager {
     protected static final Logger BROKER_LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -105,4 +106,12 @@ public class RocksDBConfigManager {
     public void batchPutWithWal(final WriteBatch batch) throws Exception {
         this.configRocksDBStorage.batchPutWithWal(batch);
     }
+
+    public Statistics getStatistics() {
+        if (this.configRocksDBStorage == null) {
+            return null;
+        }
+
+        return configRocksDBStorage.getStatistics();
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java
 
b/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java
new file mode 100644
index 0000000000..899ac14a9a
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.metrics;
+
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+
+public class NopObservableDoubleGauge implements ObservableDoubleGauge {
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
index 730469e590..f9b3e4c6fa 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.remoting.metrics;
 
 public class RemotingMetricsConstant {
     public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency";
-
     public static final String LABEL_PROTOCOL_TYPE = "protocol_type";
     public static final String LABEL_REQUEST_CODE = "request_code";
     public static final String LABEL_RESPONSE_CODE = "response_code";
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 87ccb5474b..6141b778bf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -19,12 +19,17 @@ package org.apache.rocketmq.store;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
 
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
+import org.apache.rocketmq.store.metrics.RocksDBStoreMetricsManager;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
 import org.apache.rocketmq.store.queue.RocksDBConsumeQueue;
@@ -166,4 +171,11 @@ public class RocksDBMessageStore extends 
DefaultMessageStore {
         // todo
         return 0;
     }
+
+    @Override
+    public void initMetrics(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier) {
+        DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, 
this);
+        // Also add some metrics for rocksdb's monitoring.
+        RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, 
this);
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
index 271604b1e5..956501c64f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
@@ -30,10 +30,22 @@ public class DefaultStoreMetricsConstant {
 
     public static final String COUNTER_TIMER_ENQUEUE_TOTAL = 
"rocketmq_timer_enqueue_total";
     public static final String COUNTER_TIMER_DEQUEUE_TOTAL = 
"rocketmq_timer_dequeue_total";
+    public static final String GAUGE_TIMER_MESSAGE_SNAPSHOT = 
"rocketmq_timer_message_snapshot";
+    public static final String HISTOGRAM_DELAY_MSG_LATENCY = 
"rocketmq_delay_message_latency";
 
     public static final String LABEL_STORAGE_TYPE = "storage_type";
     public static final String DEFAULT_STORAGE_TYPE = "local";
     public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
     public static final String DEFAULT_STORAGE_MEDIUM = "disk";
     public static final String LABEL_TOPIC = "topic";
+    public static final String LABEL_TIMING_BOUND = "timer_bound_s";
+    public static final String GAUGE_BYTES_ROCKSDB_WRITTEN = 
"rocketmq_rocksdb_bytes_written";
+    public static final String GAUGE_BYTES_ROCKSDB_READ = 
"rocketmq_rocksdb_bytes_read";
+
+    public static final String GAUGE_TIMES_ROCKSDB_WRITTEN_SELF = 
"rocketmq_rocksdb_times_written_self";
+    public static final String GAUGE_TIMES_ROCKSDB_WRITTEN_OTHER = 
"rocketmq_rocksdb_times_written_other";
+    public static final String GAUGE_RATE_ROCKSDB_CACHE_HIT = 
"rocketmq_rocksdb_rate_cache_hit";
+    public static final String GAUGE_TIMES_ROCKSDB_COMPRESSED = 
"rocketmq_rocksdb_times_compressed";
+    public static final String GAUGE_BYTES_READ_AMPLIFICATION = 
"rocketmq_rocksdb_read_amplification_bytes";
+    public static final String GAUGE_TIMES_ROCKSDB_READ = 
"rocketmq_rocksdb_times_read";
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 45a6bbc680..db4c7bb766 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -20,19 +20,29 @@ import com.google.common.collect.Lists;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.common.AttributesBuilder;
 import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.sdk.metrics.Aggregation;
 import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
 import io.opentelemetry.sdk.metrics.ViewBuilder;
-import java.io.File;
-import java.util.List;
-import java.util.function.Supplier;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
 import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.Slot;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.apache.rocketmq.store.timer.TimerMetrics;
+import org.apache.rocketmq.store.timer.TimerWheel;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
 
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_DEQUEUE_TOTAL;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_ENQUEUE_TOTAL;
@@ -46,9 +56,12 @@ import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUG
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LATENCY;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LAG;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LATENCY;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_MESSAGE_SNAPSHOT;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMING_MESSAGES;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.HISTOGRAM_DELAY_MSG_LATENCY;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
 
 public class DefaultStoreMetricsManager {
@@ -68,9 +81,26 @@ public class DefaultStoreMetricsManager {
 
     public static LongCounter timerDequeueTotal = new NopLongCounter();
     public static LongCounter timerEnqueueTotal = new NopLongCounter();
+    public static ObservableLongGauge timerMessageSnapshot = new 
NopObservableLongGauge();
+    public static LongHistogram timerMessageSetLatency = new 
NopLongHistogram();
 
     public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() 
{
-        return Lists.newArrayList();
+        List<Double> rpcCostTimeBuckets = Arrays.asList(
+                // day * hour * min * second
+                1d * 1 * 1 * 60, // 60 second
+                1d * 1 * 10 * 60, // 10 min
+                1d * 1 * 60 * 60, // 1 hour
+                1d * 12 * 60 * 60, // 12 hour
+                1d * 24 * 60 * 60, // 1 day
+                3d * 24 * 60 * 60 // 3 day
+        );
+        InstrumentSelector selector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM)
+                .setName(HISTOGRAM_DELAY_MSG_LATENCY)
+                .build();
+        ViewBuilder viewBuilder = View.builder()
+                
.setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
+        return Lists.newArrayList(new Pair<>(selector, viewBuilder));
     }
 
     public static void init(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier,
@@ -168,6 +198,31 @@ public class DefaultStoreMetricsManager {
             timerEnqueueTotal = 
meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
                 .setDescription("Total number of timer enqueue")
                 .build();
+            timerMessageSnapshot = 
meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT)
+                .setDescription("Timer message distribution snapshot, only 
count timing messages in 24h.")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMetrics timerMetrics = 
messageStore.getTimerMessageStore().getTimerMetrics();
+                    TimerWheel timerWheel = 
messageStore.getTimerMessageStore().getTimerWheel();
+                    int precisionMs = messageStoreConfig.getTimerPrecisionMs();
+                    List<Integer> timerDist = timerMetrics.getTimerDistList();
+                    long currTime = System.currentTimeMillis() / precisionMs * 
precisionMs;
+                    for (int i = 0; i < timerDist.size(); i++) {
+                        int slotBeforeNum = i == 0 ? 0 : timerDist.get(i - 1) 
* 1000 / precisionMs;
+                        int slotTotalNum = timerDist.get(i) * 1000 / 
precisionMs;
+                        int periodTotal = 0;
+                        for (int j = slotBeforeNum; j < slotTotalNum; j++) {
+                            Slot slotEach = timerWheel.getSlot(currTime + 
(long) j * precisionMs);
+                            periodTotal += slotEach.num;
+                        }
+                        measurement.record(periodTotal, 
newAttributesBuilder().put(LABEL_TIMING_BOUND, 
timerDist.get(i).toString()).build());
+                    }
+                });
+            timerMessageSetLatency = 
meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY)
+                    .setDescription("Timer message set latency distribution")
+                    .setUnit("seconds")
+                    .ofLongs()
+                    .build();
         }
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
new file mode 100644
index 0000000000..6029488056
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopObservableDoubleGauge;
+import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
+import org.apache.rocketmq.store.RocksDBMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore;
+import org.rocksdb.TickerType;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_MEDIUM;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_TYPE;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_BYTES_ROCKSDB_READ;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_BYTES_ROCKSDB_WRITTEN;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE;
+
+public class RocksDBStoreMetricsManager {
+    public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+    public static MessageStoreConfig messageStoreConfig;
+
+    // The cumulative number of bytes read from the database.
+    public static ObservableLongGauge bytesRocksdbRead = new 
NopObservableLongGauge();
+
+    // The cumulative number of bytes written to the database.
+    public static ObservableLongGauge bytesRocksdbWritten = new 
NopObservableLongGauge();
+
+    // The cumulative number of read operations performed.
+    public static ObservableLongGauge timesRocksdbRead = new 
NopObservableLongGauge();
+
+    // The cumulative number of write operations performed.
+    public static ObservableLongGauge timesRocksdbWrittenSelf = new 
NopObservableLongGauge();
+    public static ObservableLongGauge timesRocksdbWrittenOther = new 
NopObservableLongGauge();
+
+    // The cumulative number of compressions that have occurred.
+    public static ObservableLongGauge timesRocksdbCompressed = new 
NopObservableLongGauge();
+
+    // The ratio of the amount of data actually written to the storage medium 
to the amount of data written by the application.
+    public static ObservableDoubleGauge bytesRocksdbAmplificationRead = new 
NopObservableDoubleGauge();
+
+    // The rate at which cache lookups were served from the cache rather than 
needing to be fetched from disk.
+    public static ObservableDoubleGauge rocksdbCacheHitRate = new 
NopObservableDoubleGauge();
+
+    public static volatile long blockCacheHitTimes = 0;
+    public static volatile long blockCacheMissTimes = 0;
+
+
+
+    public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() 
{
+        return Lists.newArrayList();
+    }
+
+    public static void init(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier,
+        RocksDBMessageStore messageStore) {
+        RocksDBStoreMetricsManager.attributesBuilderSupplier = 
attributesBuilderSupplier;
+        bytesRocksdbWritten = meter.gaugeBuilder(GAUGE_BYTES_ROCKSDB_WRITTEN)
+                .setDescription("The cumulative number of bytes written to the 
database.")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.BYTES_WRITTEN), 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+        bytesRocksdbRead = meter.gaugeBuilder(GAUGE_BYTES_ROCKSDB_READ)
+                .setDescription("The cumulative number of bytes read from the 
database.")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.BYTES_READ), 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+        timesRocksdbWrittenSelf = 
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_WRITTEN_SELF)
+                .setDescription("The cumulative number of write operations 
performed by self.")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.WRITE_DONE_BY_SELF), 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+        timesRocksdbWrittenOther = 
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_WRITTEN_OTHER)
+                .setDescription("The cumulative number of write operations 
performed by other.")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.WRITE_DONE_BY_OTHER), 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+        timesRocksdbRead = 
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_READ)
+                .setDescription("The cumulative number of write operations 
performed by other.")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.NUMBER_KEYS_READ), 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+        rocksdbCacheHitRate = 
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_RATE_ROCKSDB_CACHE_HIT)
+                .setDescription("The rate at which cache lookups were served 
from the cache rather than needing to be fetched from disk.")
+                .buildWithCallback(measurement -> {
+                    long newHitTimes = 
((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.BLOCK_CACHE_HIT);
+                    long newMissTimes = 
((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.BLOCK_CACHE_MISS);
+                    long totalPeriod = newHitTimes - blockCacheHitTimes + 
newMissTimes - blockCacheMissTimes;
+                    double hitRate = totalPeriod == 0 ? 0 : 
(double)(newHitTimes - blockCacheHitTimes) / totalPeriod;
+                    blockCacheHitTimes = newHitTimes;
+                    blockCacheMissTimes = newMissTimes;
+                    measurement.record(hitRate, 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+        timesRocksdbCompressed = 
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_COMPRESSED)
+                .setDescription("The cumulative number of compressions that 
have occurred.")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.NUMBER_BLOCK_COMPRESSED), 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+        bytesRocksdbAmplificationRead = 
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_BYTES_READ_AMPLIFICATION)
+                .setDescription("The rate at which cache lookups were served 
from the cache rather than needing to be fetched from disk.")
+                .buildWithCallback(measurement -> {
+                    
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+                            
.getStatistics().getTickerCount(TickerType.READ_AMP_TOTAL_READ_BYTES), 
newAttributesBuilder().put("type", "consume_queue").build());
+                });
+    }
+
+    public static AttributesBuilder newAttributesBuilder() {
+        if (attributesBuilderSupplier == null) {
+            return Attributes.builder();
+        }
+        return attributesBuilderSupplier.get()
+            .put(LABEL_STORAGE_TYPE, DEFAULT_STORAGE_TYPE)
+            .put(LABEL_STORAGE_MEDIUM, DEFAULT_STORAGE_MEDIUM);
+    }
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 78456cfcd8..4c66696e3c 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -45,6 +45,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.Statistics;
 import org.rocksdb.WriteBatch;
 
 public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
@@ -266,6 +267,9 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         }
     }
 
+    public Statistics getStatistics() {
+        return rocksDBStorage.getStatistics();
+    }
     @Override
     public List<ByteBuffer> rangeQuery(final String topic, final int queueId, 
final long startIndex, final int num) throws RocksDBException {
         return this.rocksDBConsumeQueueTable.rangeQuery(topic, queueId, 
startIndex, num);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 872cd71054..819b3e96a4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import io.opentelemetry.api.common.Attributes;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.rocketmq.common.ServiceThread;
@@ -64,6 +65,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
 import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
@@ -686,6 +688,9 @@ public class TimerMessageStore {
                                 return false;
                             }
                         }
+                        Attributes attributes = 
DefaultStoreMetricsManager.newAttributesBuilder()
+                                .put(DefaultStoreMetricsConstant.LABEL_TOPIC, 
msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
+                        
DefaultStoreMetricsManager.timerMessageSetLatency.record((delayedTime - 
msgExt.getBornTimestamp()) / 1000, attributes);
                     }
                 } catch (Exception e) {
                     // here may cause the message loss

Reply via email to