This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3c2e81dcd4 [ISSUE #9369] Fix reset offset commit pull offset when use 
pop consumer service (#9370)
3c2e81dcd4 is described below

commit 3c2e81dcd4ce2c2bd11d428b11362a620a771afd
Author: lizhimins <707364...@qq.com>
AuthorDate: Mon Apr 28 14:02:22 2025 +0800

    [ISSUE #9369] Fix reset offset commit pull offset when use pop consumer 
service (#9370)
---
 broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java | 4 ----
 .../java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java | 4 ++++
 .../org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java    | 3 +--
 .../org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java  | 4 ++++
 4 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2083b769eb..d2f2ae6161 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1555,10 +1555,6 @@ public class BrokerController {
             this.consumerFilterManager.persist();
         }
 
-        if (this.consumerOrderInfoManager != null) {
-            this.consumerOrderInfoManager.persist();
-        }
-
         if (this.scheduleMessageService != null) {
             this.scheduleMessageService.persist();
             this.scheduleMessageService.shutdown();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index eafb47a89d..140604f521 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -283,6 +283,10 @@ public class ConsumerOffsetManager extends ConfigManager {
         return offset;
     }
 
+    public void clearPullOffset(final String group, final String topic) {
+        this.pullOffsetTable.remove(topic + TOPIC_GROUP_SEPARATOR + group);
+    }
+
     @Override
     public String encode() {
         return this.encode(false);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 7064485e29..79279b8894 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2263,8 +2263,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             }
             if 
(brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
                 brokerController.getPopConsumerService().clearCache(group, 
topic, entry.getKey());
-                brokerController.getConsumerOffsetManager().commitPullOffset(
-                    "ResetOffsetInner", group, topic, entry.getKey(), 
entry.getValue());
+                
brokerController.getConsumerOffsetManager().clearPullOffset(group, topic);
             }
             body.getOffsetTable().put(new MessageQueue(topic, brokerName, 
entry.getKey()), entry.getValue());
         }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 9fc553409d..d980090a23 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -77,6 +77,10 @@ public class ConsumerOffsetManagerTest {
         consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
         consumerOffsetManager.removeOffset(group);
         
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic + 
TOPIC_GROUP_SEPARATOR + group));
+
+        consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
+        consumerOffsetManager.clearPullOffset(group, topic);
+        Assert.assertEquals(-1L, consumerOffsetManager.queryPullOffset(group, 
topic, 0));
     }
 
     @Test

Reply via email to