This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 61ea51252c [ISSUE #9172] Clean pull offset and reset offset when 
delete subscription group (#9173)
61ea51252c is described below

commit 61ea51252c7e2e8c9ce2f53702f8c66356501ada
Author: lizhimins <707364...@qq.com>
AuthorDate: Wed Feb 12 19:09:55 2025 +0800

    [ISSUE #9172] Clean pull offset and reset offset when delete subscription 
group (#9173)
    
    * [ISSUE #9172] Clean pull offset and reset offset when delete subscription 
group
    
    * [ISSUE #9174] Add a collection of predefined Groups and common checking 
methods in the MixAll (#9175)
    
    Signed-off-by: ltamber <ltambe...@gmail.com>
    
    * [ISSUE #9177] Fix unstable tests in AdaptiveLockTest.testAdaptiveLock  
(#9178)
    
    ---------
    
    Signed-off-by: ltamber <ltambe...@gmail.com>
    Co-authored-by: ltamber <ltambe...@gmail.com>
    Co-authored-by: hqbfz <125714719+3424672...@users.noreply.github.com>
---
 .../broker/offset/ConsumerOffsetManager.java       | 33 ++++++++++++++--------
 .../broker/offset/ConsumerOffsetManagerTest.java   | 14 +++++++++
 2 files changed, 36 insertions(+), 11 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 85bc8e3789..eafb47a89d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Strings;
 
+import java.util.function.Function;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -395,19 +396,29 @@ public class ConsumerOffsetManager extends ConfigManager {
     }
 
     public void removeOffset(final String group) {
-        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = 
this.offsetTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
-            String topicAtGroup = next.getKey();
-            if (topicAtGroup.contains(group)) {
-                String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
-                if (arrays.length == 2 && group.equals(arrays[1])) {
-                    it.remove();
-                    removeConsumerOffset(topicAtGroup);
-                    LOG.warn("clean group offset {}", topicAtGroup);
+        Function<Iterator<Entry<String, ConcurrentMap<Integer, Long>>>, 
Boolean> deleteFunction = it -> {
+            boolean removed = false;
+            while (it.hasNext()) {
+                Entry<String, ConcurrentMap<Integer, Long>> entry = it.next();
+                String topicAtGroup = entry.getKey();
+                if (topicAtGroup.contains(group)) {
+                    String[] arrays = 
topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+                    if (arrays.length == 2 && group.equals(arrays[1])) {
+                        it.remove();
+                        removeConsumerOffset(topicAtGroup);
+                        removed = true;
+                    }
                 }
             }
-        }
+            return removed;
+        };
+
+        boolean clearOffset = 
deleteFunction.apply(this.offsetTable.entrySet().iterator());
+        boolean clearReset = 
deleteFunction.apply(this.resetOffsetTable.entrySet().iterator());
+        boolean clearPull = 
deleteFunction.apply(this.pullOffsetTable.entrySet().iterator());
+
+        LOG.info("Consumer offset manager clean group offset, groupName={}, " +
+            "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, 
clearOffset, clearReset, clearPull);
     }
 
     public void assignResetOffset(String topic, String group, int queueId, 
long offset) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 7bd289a6f1..9fc553409d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.broker.offset;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Assert;
 import org.junit.Before;
@@ -27,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.mockito.Mockito;
 
+import static 
org.apache.rocketmq.broker.offset.ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class ConsumerOffsetManagerTest {
@@ -65,6 +67,18 @@ public class ConsumerOffsetManagerTest {
         
assertThat(!consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue();
     }
 
+    @Test
+    public void removeOffsetByGroupTest() {
+        String topic = "TopicName";
+        String group = "GroupName";
+        Mockito.when(brokerController.getBrokerConfig()).thenReturn(new 
BrokerConfig());
+        consumerOffsetManager.commitOffset("Commit", group, topic, 0, 100);
+        consumerOffsetManager.assignResetOffset(topic, group, 0, 100);
+        consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
+        consumerOffsetManager.removeOffset(group);
+        
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic + 
TOPIC_GROUP_SEPARATOR + group));
+    }
+
     @Test
     public void testOffsetPersistInMemory() {
         ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = 
consumerOffsetManager.getOffsetTable();

Reply via email to