This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e75554d5a8 [ISSUE #8804] clean offset when remove group offset
e75554d5a8 is described below

commit e75554d5a8b7708d5a8a5ae9bd723b614f8adf7c
Author: Lei Zhiyuan <leizhiy...@gmail.com>
AuthorDate: Thu Oct 10 09:53:06 2024 +0800

    [ISSUE #8804] clean offset when remove group offset
---
 .../broker/offset/LmqConsumerOffsetManager.java    | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
index ce70b1a820..53e9e2e063 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.offset;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -110,4 +111,25 @@ public class LmqConsumerOffsetManager extends 
ConsumerOffsetManager {
     public void setLmqOffsetTable(ConcurrentHashMap<String, Long> 
lmqOffsetTable) {
         this.lmqOffsetTable = lmqOffsetTable;
     }
+
+    @Override
+    public void removeOffset(String group) {
+        if (!MixAll.isLmq(group)) {
+            super.removeOffset(group);
+            return;
+        }
+        Iterator<Map.Entry<String, Long>> it = 
this.lmqOffsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, Long> next = it.next();
+            String topicAtGroup = next.getKey();
+            if (topicAtGroup.contains(group)) {
+                String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+                if (arrays.length == 2 && group.equals(arrays[1])) {
+                    it.remove();
+                    removeConsumerOffset(topicAtGroup);
+                    LOG.warn("clean lmq group offset {}", topicAtGroup);
+                }
+            }
+        }
+    }
 }

Reply via email to