This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3c2e81dcd4 [ISSUE #9369] Fix reset offset commit pull offset when use pop consumer service (#9370) 3c2e81dcd4 is described below commit 3c2e81dcd4ce2c2bd11d428b11362a620a771afd Author: lizhimins <707364...@qq.com> AuthorDate: Mon Apr 28 14:02:22 2025 +0800 [ISSUE #9369] Fix reset offset commit pull offset when use pop consumer service (#9370) --- broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java | 4 ---- .../java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java | 4 ++++ .../org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 3 +-- .../org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java | 4 ++++ 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 2083b769eb..d2f2ae6161 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1555,10 +1555,6 @@ public class BrokerController { this.consumerFilterManager.persist(); } - if (this.consumerOrderInfoManager != null) { - this.consumerOrderInfoManager.persist(); - } - if (this.scheduleMessageService != null) { this.scheduleMessageService.persist(); this.scheduleMessageService.shutdown(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index eafb47a89d..140604f521 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -283,6 +283,10 @@ public class ConsumerOffsetManager extends ConfigManager { return offset; } + public void clearPullOffset(final String group, final String topic) { + this.pullOffsetTable.remove(topic + TOPIC_GROUP_SEPARATOR + group); + } + @Override public String encode() { return this.encode(false); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 7064485e29..79279b8894 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2263,8 +2263,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { brokerController.getPopConsumerService().clearCache(group, topic, entry.getKey()); - brokerController.getConsumerOffsetManager().commitPullOffset( - "ResetOffsetInner", group, topic, entry.getKey(), entry.getValue()); + brokerController.getConsumerOffsetManager().clearPullOffset(group, topic); } body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue()); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java index 9fc553409d..d980090a23 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -77,6 +77,10 @@ public class ConsumerOffsetManagerTest { consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100); consumerOffsetManager.removeOffset(group); Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic + TOPIC_GROUP_SEPARATOR + group)); + + consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100); + consumerOffsetManager.clearPullOffset(group, topic); + Assert.assertEquals(-1L, consumerOffsetManager.queryPullOffset(group, topic, 0)); } @Test