This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 61ea51252c [ISSUE #9172] Clean pull offset and reset offset when delete subscription group (#9173) 61ea51252c is described below commit 61ea51252c7e2e8c9ce2f53702f8c66356501ada Author: lizhimins <707364...@qq.com> AuthorDate: Wed Feb 12 19:09:55 2025 +0800 [ISSUE #9172] Clean pull offset and reset offset when delete subscription group (#9173) * [ISSUE #9172] Clean pull offset and reset offset when delete subscription group * [ISSUE #9174] Add a collection of predefined Groups and common checking methods in the MixAll (#9175) Signed-off-by: ltamber <ltambe...@gmail.com> * [ISSUE #9177] Fix unstable tests in AdaptiveLockTest.testAdaptiveLock (#9178) --------- Signed-off-by: ltamber <ltambe...@gmail.com> Co-authored-by: ltamber <ltambe...@gmail.com> Co-authored-by: hqbfz <125714719+3424672...@users.noreply.github.com> --- .../broker/offset/ConsumerOffsetManager.java | 33 ++++++++++++++-------- .../broker/offset/ConsumerOffsetManagerTest.java | 14 +++++++++ 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 85bc8e3789..eafb47a89d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Strings; +import java.util.function.Function; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -395,19 +396,29 @@ public class ConsumerOffsetManager extends ConfigManager { } public void removeOffset(final String group) { - Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConcurrentMap<Integer, Long>> next = it.next(); - String topicAtGroup = next.getKey(); - if (topicAtGroup.contains(group)) { - String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2 && group.equals(arrays[1])) { - it.remove(); - removeConsumerOffset(topicAtGroup); - LOG.warn("clean group offset {}", topicAtGroup); + Function<Iterator<Entry<String, ConcurrentMap<Integer, Long>>>, Boolean> deleteFunction = it -> { + boolean removed = false; + while (it.hasNext()) { + Entry<String, ConcurrentMap<Integer, Long>> entry = it.next(); + String topicAtGroup = entry.getKey(); + if (topicAtGroup.contains(group)) { + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays.length == 2 && group.equals(arrays[1])) { + it.remove(); + removeConsumerOffset(topicAtGroup); + removed = true; + } } } - } + return removed; + }; + + boolean clearOffset = deleteFunction.apply(this.offsetTable.entrySet().iterator()); + boolean clearReset = deleteFunction.apply(this.resetOffsetTable.entrySet().iterator()); + boolean clearPull = deleteFunction.apply(this.pullOffsetTable.entrySet().iterator()); + + LOG.info("Consumer offset manager clean group offset, groupName={}, " + + "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull); } public void assignResetOffset(String topic, String group, int queueId, long offset) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java index 7bd289a6f1..9fc553409d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.offset; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Assert; import org.junit.Before; @@ -27,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.mockito.Mockito; +import static org.apache.rocketmq.broker.offset.ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR; import static org.assertj.core.api.Assertions.assertThat; public class ConsumerOffsetManagerTest { @@ -65,6 +67,18 @@ public class ConsumerOffsetManagerTest { assertThat(!consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue(); } + @Test + public void removeOffsetByGroupTest() { + String topic = "TopicName"; + String group = "GroupName"; + Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); + consumerOffsetManager.commitOffset("Commit", group, topic, 0, 100); + consumerOffsetManager.assignResetOffset(topic, group, 0, 100); + consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100); + consumerOffsetManager.removeOffset(group); + Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic + TOPIC_GROUP_SEPARATOR + group)); + } + @Test public void testOffsetPersistInMemory() { ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = consumerOffsetManager.getOffsetTable();