This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push: new 95fbf79 feat: add metrics for topic dimension read and write msg size (#271) 95fbf79 is described below commit 95fbf7936a53444a135ac8ecc45742bde62e86c6 Author: yx9o <yangx_s...@163.com> AuthorDate: Fri May 10 20:58:57 2024 +0800 feat: add metrics for topic dimension read and write msg size (#271) --- .../rocketmq/mqtt/cs/session/loop/QueueCache.java | 35 +++++++++++++++------- .../mqtt/ds/store/LmqQueueStoreManager.java | 14 +++++++++ .../ds/upstream/processor/PublishProcessor.java | 11 ++++++- .../exporter/collector/MqttMetricsCollector.java | 4 +++ .../mqtt/exporter/collector/MqttMetricsInfo.java | 4 ++- 5 files changed, 56 insertions(+), 12 deletions(-) diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java index 771d09f..52f91b8 100644 --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.mqtt.common.util.StatUtil; import org.apache.rocketmq.mqtt.cs.config.ConnectConf; import org.apache.rocketmq.mqtt.cs.session.Session; import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector; +import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -52,6 +53,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static org.apache.rocketmq.mqtt.cs.session.loop.PullResultStatus.DONE; import static org.apache.rocketmq.mqtt.cs.session.loop.PullResultStatus.LATER; @@ -183,7 +185,7 @@ public class QueueCache { CompletableFuture<PullResult> callBackResult) { if (subscription.isP2p() || subscription.isRetry()) { StatUtil.addPv("NotPullCache", 1); - collectorPullCacheStatus("NotPullCache"); + collectorPullCacheStatus("NotPullCache", null); CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count); callbackResult(pullResult, callBackResult); return DONE; @@ -198,7 +200,7 @@ public class QueueCache { CacheEntry cacheEntry = cache.getIfPresent(queue); if (cacheEntry == null) { StatUtil.addPv("NoPullCache", 1); - collectorPullCacheStatus("NotPullCache"); + collectorPullCacheStatus("NotPullCache", null); CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count); callbackResult(pullResult, callBackResult); return DONE; @@ -206,7 +208,7 @@ public class QueueCache { if (cacheEntry.loading.get()) { if (System.currentTimeMillis() - cacheEntry.startLoadingT > 1000) { StatUtil.addPv("LoadPullCacheTimeout", 1); - collectorPullCacheStatus("LoadPullCacheTimeout"); + collectorPullCacheStatus("LoadPullCacheTimeout", null); CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count); callbackResult(pullResult, callBackResult); return DONE; @@ -217,12 +219,12 @@ public class QueueCache { List<Message> cacheMsgList = cacheEntry.messageList; if (cacheMsgList.isEmpty()) { if (loadEvent.get(queue) != null) { - collectorPullCacheStatus("EmptyPullCacheLATER"); + collectorPullCacheStatus("EmptyPullCacheLATER", cacheMsgList); StatUtil.addPv("EmptyPullCacheLATER", 1); return LATER; } StatUtil.addPv("EmptyPullCache", 1); - collectorPullCacheStatus("EmptyPullCache"); + collectorPullCacheStatus("EmptyPullCache", cacheMsgList); CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count); callbackResult(pullResult, callBackResult); return DONE; @@ -230,7 +232,7 @@ public class QueueCache { if (queueOffset.getOffset() < cacheMsgList.get(0).getOffset()) { StatUtil.addPv("OutPullCache", 1); - collectorPullCacheStatus("OutPullCache"); + collectorPullCacheStatus("OutPullCache", cacheMsgList); CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count); callbackResult(pullResult, callBackResult); return DONE; @@ -249,11 +251,11 @@ public class QueueCache { if (resultMsgs.isEmpty()) { if (loadEvent.get(queue) != null) { StatUtil.addPv("PullCacheLATER", 1); - collectorPullCacheStatus("PullCacheLATER"); + collectorPullCacheStatus("PullCacheLATER", resultMsgs); return LATER; } StatUtil.addPv("OutPullCache2", 1); - collectorPullCacheStatus("OutPullCache2"); + collectorPullCacheStatus("OutPullCache2", resultMsgs); CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count); callbackResult(pullResult, callBackResult); return DONE; @@ -262,21 +264,34 @@ public class QueueCache { pullResult.setMessageList(resultMsgs); callBackResult.complete(pullResult); StatUtil.addPv("PullFromCache", 1); - collectorPullCacheStatus("PullFromCache"); + collectorPullCacheStatus("PullFromCache", resultMsgs); if (loadEvent.get(queue) != null) { return LATER; } return DONE; } - private void collectorPullCacheStatus(String pullCacheStatus) { + private void collectorPullCacheStatus(String pullCacheStatus, List<Message> resultMsgs) { try { MqttMetricsCollector.collectPullCacheStatusTps(1, pullCacheStatus); + collectReadBytes(resultMsgs); } catch (Throwable e) { logger.error("", e); } } + private void collectReadBytes(List<Message> msgFoundList) throws PrometheusException { + if (null == msgFoundList || msgFoundList.isEmpty()) { + return; + } + Map<String, Integer> maps = msgFoundList.stream() + .collect(Collectors.groupingBy(Message::getFirstTopic, + Collectors.summingInt(msg -> msg.getPayload().length))); + for (Map.Entry<String, Integer> entry : maps.entrySet()) { + MqttMetricsCollector.collectReadWriteMatchActionBytes(entry.getValue(), entry.getKey(), "pullCache"); + } + } + private void loadCache(boolean isFirst, String firstTopic, Queue queue, QueueOffset queueOffset, int count, QueueEvent event) { loadStatus.put(queue, true); diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java index 49fe87e..04b9415 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java @@ -255,6 +255,7 @@ public class LmqQueueStoreManager implements LmqQueueStore { StatUtil.addPv(pullResult.getPullStatus().name(), 1); try { MqttMetricsCollector.collectPullStatusTps(1, pullResult.getPullStatus().name()); + collectReadBytes(pullResult.getMsgFoundList()); } catch (Throwable e) { logger.error("collect prometheus error", e); } @@ -506,6 +507,7 @@ public class LmqQueueStoreManager implements LmqQueueStore { StatUtil.addPv(popResult.getPopStatus().name(), 1); try { MqttMetricsCollector.collectPullStatusTps(1, popResult.getPopStatus().name()); + collectReadBytes(popResult.getMsgFoundList()); } catch (Throwable e) { logger.error("collect prometheus error", e); } @@ -636,4 +638,16 @@ public class LmqQueueStoreManager implements LmqQueueStore { mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), timeoutMillis, ackCallback, ackMessageRequestHeader); } + + private void collectReadBytes(List<MessageExt> msgFoundList) throws PrometheusException { + if (null == msgFoundList || msgFoundList.isEmpty()) { + return; + } + Map<String, Integer> maps = msgFoundList.stream() + .collect(Collectors.groupingBy(MessageExt::getTopic, + Collectors.summingInt(msg -> msg.getBody().length))); + for (Map.Entry<String, Integer> entry : maps.entrySet()) { + MqttMetricsCollector.collectReadWriteMatchActionBytes(entry.getValue(), entry.getKey(), "pull"); + } + } } diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java index 8de2726..ed00576 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.mqtt.common.util.TopicUtils; import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager; import org.apache.rocketmq.mqtt.ds.meta.WildcardManager; import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor; +import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -105,7 +106,7 @@ public class PublishProcessor implements UpstreamProcessor, WillMsgSender { message.setMsgId(msgId); message.setBornTimestamp(bornTime); message.setEmpty(isEmpty); - + collectWriteBytes(message.getFirstTopic(), message.getPayload().length); return lmqQueueStore.putMessage(queueNames, message); } @@ -116,4 +117,12 @@ public class PublishProcessor implements UpstreamProcessor, WillMsgSender { ctx.setClientId(clientId); return put(ctx, message); } + + private void collectWriteBytes(String topic, int length) { + try { + MqttMetricsCollector.collectReadWriteMatchActionBytes(length, topic, "put"); + } catch (Throwable e) { + logger.error("Collect prometheus error", e); + } + } } diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java index d28e5b5..2580dd4 100644 --- a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java +++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java @@ -169,6 +169,10 @@ public class MqttMetricsCollector { collect(MqttMetricsInfo.CONNECTIONS_SIZE, val, labels); } + public static void collectReadWriteMatchActionBytes(long val, String... labels) throws PrometheusException { + collect(MqttMetricsInfo.READ_WRITE_MATCH_ACTION_BYTES, val, labels); + } + private static String labels2String(String... labels) { StringBuilder sb = new StringBuilder(128); for (String label : labels) { diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java index a395578..743f9d9 100644 --- a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java +++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java @@ -37,7 +37,9 @@ public enum MqttMetricsInfo { READ_WRITE_MATCH_ACTION_RT(Type.GAUGE, SubSystem.DS, "read_write_match_action_rt", "lmq read write match action rt.", null, "hostName", "hostIp", "action", "status"), CONNECTIONS_SIZE(Type.GAUGE, SubSystem.CS, "connections_size", "server connections size.", null, - "hostName", "hostIp"); + "hostName", "hostIp"), + READ_WRITE_MATCH_ACTION_BYTES(Type.COUNTER, SubSystem.DS, "read_write_match_action_bytes", "lmq read write match action bytes.", null, + "hostName", "hostIp", "topic", "action"); private final Type type;