This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f32fe78ca0 [ISSUE #9025] [RIP-73] Fix Pop Consumption reset offset 
(#9087)
f32fe78ca0 is described below

commit f32fe78ca039fc2fec3341323dc61e8a9e486368
Author: lizhimins <707364...@qq.com>
AuthorDate: Mon Dec 30 20:05:20 2024 +0800

    [ISSUE #9025] [RIP-73] Fix Pop Consumption reset offset (#9087)
---
 .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 5856873955..6bcf9aaa0f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2187,9 +2187,13 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         ResetOffsetBody body = new ResetOffsetBody();
         String brokerName = brokerController.getBrokerConfig().getBrokerName();
         for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
-            
brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, 
group, entry.getKey());
+            if (brokerController.getPopInflightMessageCounter() != null) {
+                
brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, 
group, entry.getKey());
+            }
             if 
(brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
-                brokerController.getPopConsumerService().clearCache(group, 
topic, queueId);
+                brokerController.getPopConsumerService().clearCache(group, 
topic, entry.getKey());
+                brokerController.getConsumerOffsetManager().commitPullOffset(
+                    "ResetOffsetInner", group, topic, entry.getKey(), 
entry.getValue());
             }
             body.getOffsetTable().put(new MessageQueue(topic, brokerName, 
entry.getKey()), entry.getValue());
         }

Reply via email to