This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new e75554d5a8 [ISSUE #8804] clean offset when remove group offset e75554d5a8 is described below commit e75554d5a8b7708d5a8a5ae9bd723b614f8adf7c Author: Lei Zhiyuan <leizhiy...@gmail.com> AuthorDate: Thu Oct 10 09:53:06 2024 +0800 [ISSUE #8804] clean offset when remove group offset --- .../broker/offset/LmqConsumerOffsetManager.java | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index ce70b1a820..53e9e2e063 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.offset; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -110,4 +111,25 @@ public class LmqConsumerOffsetManager extends ConsumerOffsetManager { public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) { this.lmqOffsetTable = lmqOffsetTable; } + + @Override + public void removeOffset(String group) { + if (!MixAll.isLmq(group)) { + super.removeOffset(group); + return; + } + Iterator<Map.Entry<String, Long>> it = this.lmqOffsetTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, Long> next = it.next(); + String topicAtGroup = next.getKey(); + if (topicAtGroup.contains(group)) { + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays.length == 2 && group.equals(arrays[1])) { + it.remove(); + removeConsumerOffset(topicAtGroup); + LOG.warn("clean lmq group offset {}", topicAtGroup); + } + } + } + } }