This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3c2e81dcd4 [ISSUE #9369] Fix reset offset commit pull offset when use
pop consumer service (#9370)
3c2e81dcd4 is described below
commit 3c2e81dcd4ce2c2bd11d428b11362a620a771afd
Author: lizhimins <[email protected]>
AuthorDate: Mon Apr 28 14:02:22 2025 +0800
[ISSUE #9369] Fix reset offset commit pull offset when use pop consumer
service (#9370)
---
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java | 4 ----
.../java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java | 4 ++++
.../org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 3 +--
.../org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java | 4 ++++
4 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2083b769eb..d2f2ae6161 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1555,10 +1555,6 @@ public class BrokerController {
this.consumerFilterManager.persist();
}
- if (this.consumerOrderInfoManager != null) {
- this.consumerOrderInfoManager.persist();
- }
-
if (this.scheduleMessageService != null) {
this.scheduleMessageService.persist();
this.scheduleMessageService.shutdown();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index eafb47a89d..140604f521 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -283,6 +283,10 @@ public class ConsumerOffsetManager extends ConfigManager {
return offset;
}
+ public void clearPullOffset(final String group, final String topic) {
+ this.pullOffsetTable.remove(topic + TOPIC_GROUP_SEPARATOR + group);
+ }
+
@Override
public String encode() {
return this.encode(false);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 7064485e29..79279b8894 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2263,8 +2263,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
if
(brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
brokerController.getPopConsumerService().clearCache(group,
topic, entry.getKey());
- brokerController.getConsumerOffsetManager().commitPullOffset(
- "ResetOffsetInner", group, topic, entry.getKey(),
entry.getValue());
+
brokerController.getConsumerOffsetManager().clearPullOffset(group, topic);
}
body.getOffsetTable().put(new MessageQueue(topic, brokerName,
entry.getKey()), entry.getValue());
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 9fc553409d..d980090a23 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -77,6 +77,10 @@ public class ConsumerOffsetManagerTest {
consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
consumerOffsetManager.removeOffset(group);
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic +
TOPIC_GROUP_SEPARATOR + group));
+
+ consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
+ consumerOffsetManager.clearPullOffset(group, topic);
+ Assert.assertEquals(-1L, consumerOffsetManager.queryPullOffset(group,
topic, 0));
}
@Test