This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/develop by this push:
     new 95fbf79  feat: add metrics for topic dimension read and write msg size 
(#271)
95fbf79 is described below

commit 95fbf7936a53444a135ac8ecc45742bde62e86c6
Author: yx9o <yangx_s...@163.com>
AuthorDate: Fri May 10 20:58:57 2024 +0800

    feat: add metrics for topic dimension read and write msg size (#271)
---
 .../rocketmq/mqtt/cs/session/loop/QueueCache.java  | 35 +++++++++++++++-------
 .../mqtt/ds/store/LmqQueueStoreManager.java        | 14 +++++++++
 .../ds/upstream/processor/PublishProcessor.java    | 11 ++++++-
 .../exporter/collector/MqttMetricsCollector.java   |  4 +++
 .../mqtt/exporter/collector/MqttMetricsInfo.java   |  4 ++-
 5 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index 771d09f..52f91b8 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.session.Session;
 import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -52,6 +53,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static org.apache.rocketmq.mqtt.cs.session.loop.PullResultStatus.DONE;
 import static org.apache.rocketmq.mqtt.cs.session.loop.PullResultStatus.LATER;
@@ -183,7 +185,7 @@ public class QueueCache {
                                         CompletableFuture<PullResult> 
callBackResult) {
         if (subscription.isP2p() || subscription.isRetry()) {
             StatUtil.addPv("NotPullCache", 1);
-            collectorPullCacheStatus("NotPullCache");
+            collectorPullCacheStatus("NotPullCache", null);
             CompletableFuture<PullResult> pullResult = 
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, 
count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -198,7 +200,7 @@ public class QueueCache {
         CacheEntry cacheEntry = cache.getIfPresent(queue);
         if (cacheEntry == null) {
             StatUtil.addPv("NoPullCache", 1);
-            collectorPullCacheStatus("NotPullCache");
+            collectorPullCacheStatus("NotPullCache", null);
             CompletableFuture<PullResult> pullResult = 
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, 
count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -206,7 +208,7 @@ public class QueueCache {
         if (cacheEntry.loading.get()) {
             if (System.currentTimeMillis() - cacheEntry.startLoadingT > 1000) {
                 StatUtil.addPv("LoadPullCacheTimeout", 1);
-                collectorPullCacheStatus("LoadPullCacheTimeout");
+                collectorPullCacheStatus("LoadPullCacheTimeout", null);
                 CompletableFuture<PullResult> pullResult = 
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, 
count);
                 callbackResult(pullResult, callBackResult);
                 return DONE;
@@ -217,12 +219,12 @@ public class QueueCache {
         List<Message> cacheMsgList = cacheEntry.messageList;
         if (cacheMsgList.isEmpty()) {
             if (loadEvent.get(queue) != null) {
-                collectorPullCacheStatus("EmptyPullCacheLATER");
+                collectorPullCacheStatus("EmptyPullCacheLATER", cacheMsgList);
                 StatUtil.addPv("EmptyPullCacheLATER", 1);
                 return LATER;
             }
             StatUtil.addPv("EmptyPullCache", 1);
-            collectorPullCacheStatus("EmptyPullCache");
+            collectorPullCacheStatus("EmptyPullCache", cacheMsgList);
             CompletableFuture<PullResult> pullResult = 
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, 
count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -230,7 +232,7 @@ public class QueueCache {
 
         if (queueOffset.getOffset() < cacheMsgList.get(0).getOffset()) {
             StatUtil.addPv("OutPullCache", 1);
-            collectorPullCacheStatus("OutPullCache");
+            collectorPullCacheStatus("OutPullCache", cacheMsgList);
             CompletableFuture<PullResult> pullResult = 
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, 
count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -249,11 +251,11 @@ public class QueueCache {
         if (resultMsgs.isEmpty()) {
             if (loadEvent.get(queue) != null) {
                 StatUtil.addPv("PullCacheLATER", 1);
-                collectorPullCacheStatus("PullCacheLATER");
+                collectorPullCacheStatus("PullCacheLATER", resultMsgs);
                 return LATER;
             }
             StatUtil.addPv("OutPullCache2", 1);
-            collectorPullCacheStatus("OutPullCache2");
+            collectorPullCacheStatus("OutPullCache2", resultMsgs);
             CompletableFuture<PullResult> pullResult = 
lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, 
count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -262,21 +264,34 @@ public class QueueCache {
         pullResult.setMessageList(resultMsgs);
         callBackResult.complete(pullResult);
         StatUtil.addPv("PullFromCache", 1);
-        collectorPullCacheStatus("PullFromCache");
+        collectorPullCacheStatus("PullFromCache", resultMsgs);
         if (loadEvent.get(queue) != null) {
             return LATER;
         }
         return DONE;
     }
 
-    private void collectorPullCacheStatus(String pullCacheStatus) {
+    private void collectorPullCacheStatus(String pullCacheStatus, 
List<Message> resultMsgs) {
         try {
             MqttMetricsCollector.collectPullCacheStatusTps(1, pullCacheStatus);
+            collectReadBytes(resultMsgs);
         } catch (Throwable e) {
             logger.error("", e);
         }
     }
 
+    private void collectReadBytes(List<Message> msgFoundList) throws 
PrometheusException {
+        if (null == msgFoundList || msgFoundList.isEmpty()) {
+            return;
+        }
+        Map<String, Integer> maps = msgFoundList.stream()
+                .collect(Collectors.groupingBy(Message::getFirstTopic,
+                        Collectors.summingInt(msg -> 
msg.getPayload().length)));
+        for (Map.Entry<String, Integer> entry : maps.entrySet()) {
+            
MqttMetricsCollector.collectReadWriteMatchActionBytes(entry.getValue(), 
entry.getKey(), "pullCache");
+        }
+    }
+
     private void loadCache(boolean isFirst, String firstTopic, Queue queue, 
QueueOffset queueOffset, int count,
                            QueueEvent event) {
         loadStatus.put(queue, true);
diff --git 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 49fe87e..04b9415 100644
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -255,6 +255,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
                     StatUtil.addPv(pullResult.getPullStatus().name(), 1);
                     try {
                         MqttMetricsCollector.collectPullStatusTps(1, 
pullResult.getPullStatus().name());
+                        collectReadBytes(pullResult.getMsgFoundList());
                     } catch (Throwable e) {
                         logger.error("collect prometheus error", e);
                     }
@@ -506,6 +507,7 @@ public class LmqQueueStoreManager implements LmqQueueStore {
                 StatUtil.addPv(popResult.getPopStatus().name(), 1);
                 try {
                     MqttMetricsCollector.collectPullStatusTps(1, 
popResult.getPopStatus().name());
+                    collectReadBytes(popResult.getMsgFoundList());
                 } catch (Throwable e) {
                     logger.error("collect prometheus error", e);
                 }
@@ -636,4 +638,16 @@ public class LmqQueueStoreManager implements LmqQueueStore 
{
 
         
mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(),
 timeoutMillis, ackCallback, ackMessageRequestHeader);
     }
+
+    private void collectReadBytes(List<MessageExt> msgFoundList) throws 
PrometheusException {
+        if (null == msgFoundList || msgFoundList.isEmpty()) {
+            return;
+        }
+        Map<String, Integer> maps = msgFoundList.stream()
+                .collect(Collectors.groupingBy(MessageExt::getTopic,
+                        Collectors.summingInt(msg -> msg.getBody().length)));
+        for (Map.Entry<String, Integer> entry : maps.entrySet()) {
+            
MqttMetricsCollector.collectReadWriteMatchActionBytes(entry.getValue(), 
entry.getKey(), "pull");
+        }
+    }
 }
diff --git 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index 8de2726..ed00576 100644
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 import org.apache.rocketmq.mqtt.ds.meta.WildcardManager;
 import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -105,7 +106,7 @@ public class PublishProcessor implements UpstreamProcessor, 
WillMsgSender {
         message.setMsgId(msgId);
         message.setBornTimestamp(bornTime);
         message.setEmpty(isEmpty);
-
+        collectWriteBytes(message.getFirstTopic(), 
message.getPayload().length);
         return lmqQueueStore.putMessage(queueNames, message);
     }
 
@@ -116,4 +117,12 @@ public class PublishProcessor implements 
UpstreamProcessor, WillMsgSender {
         ctx.setClientId(clientId);
         return put(ctx, message);
     }
+
+    private void collectWriteBytes(String topic, int length) {
+        try {
+            MqttMetricsCollector.collectReadWriteMatchActionBytes(length, 
topic, "put");
+        } catch (Throwable e) {
+            logger.error("Collect prometheus error", e);
+        }
+    }
 }
diff --git 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
index d28e5b5..2580dd4 100644
--- 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
+++ 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -169,6 +169,10 @@ public class MqttMetricsCollector {
         collect(MqttMetricsInfo.CONNECTIONS_SIZE, val, labels);
     }
 
+    public static void collectReadWriteMatchActionBytes(long val, String... 
labels) throws PrometheusException {
+        collect(MqttMetricsInfo.READ_WRITE_MATCH_ACTION_BYTES, val, labels);
+    }
+
     private static String labels2String(String... labels) {
         StringBuilder sb = new StringBuilder(128);
         for (String label : labels) {
diff --git 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
index a395578..743f9d9 100644
--- 
a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
+++ 
b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
@@ -37,7 +37,9 @@ public enum MqttMetricsInfo {
     READ_WRITE_MATCH_ACTION_RT(Type.GAUGE, SubSystem.DS, 
"read_write_match_action_rt", "lmq read write match action rt.", null,
         "hostName", "hostIp", "action", "status"),
     CONNECTIONS_SIZE(Type.GAUGE, SubSystem.CS, "connections_size", "server 
connections size.", null,
-        "hostName", "hostIp");
+        "hostName", "hostIp"),
+    READ_WRITE_MATCH_ACTION_BYTES(Type.COUNTER, SubSystem.DS, 
"read_write_match_action_bytes", "lmq read write match action bytes.", null,
+        "hostName", "hostIp", "topic", "action");
 
 
     private final Type type;

Reply via email to