This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 40b25b36e0 [ISSUE #9217] Fix broker's inflight and available message 
counts incorrect when the pop consumer service is enabled (#9218)
40b25b36e0 is described below

commit 40b25b36e0878614420ba685a3c90fd58026bee5
Author: lizhimins <707364...@qq.com>
AuthorDate: Mon Mar 3 14:24:02 2025 +0800

    [ISSUE #9217] Fix broker's inflight and available message counts incorrect 
when the pop consumer service is enabled (#9218)
---
 .../apache/rocketmq/broker/metrics/ConsumerLagCalculator.java    | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 1b898f95de..35519c1d1c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -348,7 +348,7 @@ public class ConsumerLagCalculator {
             brokerOffset = 0;
         }
 
-        if (isPop) {
+        if (isPop && !brokerConfig.isPopConsumerKVServiceEnable()) {
             long pullOffset = popBufferMergeService.getLatestOffset(topic, 
group, queueId);
             if (pullOffset < 0) {
                 pullOffset = offsetManager.queryOffset(group, topic, queueId);
@@ -401,7 +401,7 @@ public class ConsumerLagCalculator {
 
     public Pair<Long, Long> getInFlightMsgStats(String group, String topic, 
int queueId, boolean isPop)
         throws ConsumeQueueException {
-        if (isPop) {
+        if (isPop && !brokerConfig.isPopConsumerKVServiceEnable()) {
             long inflight = 
popInflightMessageCounter.getGroupPopInFlightMessageNum(topic, group, queueId);
             long pullOffset = popBufferMergeService.getLatestOffset(topic, 
group, queueId);
             if (pullOffset < 0) {
@@ -456,14 +456,11 @@ public class ConsumerLagCalculator {
         }
 
         long pullOffset;
-        if (isPop) {
+        if (isPop && !brokerConfig.isPopConsumerKVServiceEnable()) {
             pullOffset = popBufferMergeService.getLatestOffset(topic, group, 
queueId);
             if (pullOffset < 0) {
                 pullOffset = offsetManager.queryOffset(group, topic, queueId);
             }
-            if (pullOffset < 0) {
-                pullOffset = brokerOffset;
-            }
         } else {
             pullOffset = offsetManager.queryPullOffset(group, topic, queueId);
         }

Reply via email to