This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 40b25b36e0 [ISSUE #9217] Fix broker's inflight and available message counts incorrect when the pop consumer service is enabled (#9218) 40b25b36e0 is described below commit 40b25b36e0878614420ba685a3c90fd58026bee5 Author: lizhimins <707364...@qq.com> AuthorDate: Mon Mar 3 14:24:02 2025 +0800 [ISSUE #9217] Fix broker's inflight and available message counts incorrect when the pop consumer service is enabled (#9218) --- .../apache/rocketmq/broker/metrics/ConsumerLagCalculator.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java index 1b898f95de..35519c1d1c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java @@ -348,7 +348,7 @@ public class ConsumerLagCalculator { brokerOffset = 0; } - if (isPop) { + if (isPop && !brokerConfig.isPopConsumerKVServiceEnable()) { long pullOffset = popBufferMergeService.getLatestOffset(topic, group, queueId); if (pullOffset < 0) { pullOffset = offsetManager.queryOffset(group, topic, queueId); @@ -401,7 +401,7 @@ public class ConsumerLagCalculator { public Pair<Long, Long> getInFlightMsgStats(String group, String topic, int queueId, boolean isPop) throws ConsumeQueueException { - if (isPop) { + if (isPop && !brokerConfig.isPopConsumerKVServiceEnable()) { long inflight = popInflightMessageCounter.getGroupPopInFlightMessageNum(topic, group, queueId); long pullOffset = popBufferMergeService.getLatestOffset(topic, group, queueId); if (pullOffset < 0) { @@ -456,14 +456,11 @@ public class ConsumerLagCalculator { } long pullOffset; - if (isPop) { + if (isPop && !brokerConfig.isPopConsumerKVServiceEnable()) { pullOffset = popBufferMergeService.getLatestOffset(topic, group, queueId); if (pullOffset < 0) { pullOffset = offsetManager.queryOffset(group, topic, queueId); } - if (pullOffset < 0) { - pullOffset = brokerOffset; - } } else { pullOffset = offsetManager.queryPullOffset(group, topic, queueId); }