This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new f32fe78ca0 [ISSUE #9025] [RIP-73] Fix Pop Consumption reset offset (#9087) f32fe78ca0 is described below commit f32fe78ca039fc2fec3341323dc61e8a9e486368 Author: lizhimins <707364...@qq.com> AuthorDate: Mon Dec 30 20:05:20 2024 +0800 [ISSUE #9025] [RIP-73] Fix Pop Consumption reset offset (#9087) --- .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 5856873955..6bcf9aaa0f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2187,9 +2187,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ResetOffsetBody body = new ResetOffsetBody(); String brokerName = brokerController.getBrokerConfig().getBrokerName(); for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) { - brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey()); + if (brokerController.getPopInflightMessageCounter() != null) { + brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey()); + } if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { - brokerController.getPopConsumerService().clearCache(group, topic, queueId); + brokerController.getPopConsumerService().clearCache(group, topic, entry.getKey()); + brokerController.getConsumerOffsetManager().commitPullOffset( + "ResetOffsetInner", group, topic, entry.getKey(), entry.getValue()); } body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue()); }