This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d38558800c [ISSUE #7740] Optimize LocalFileOffsetStore d38558800c is described below commit d38558800c184ad34030388afec54715ad6784a8 Author: Liu Shengzhong <szliu0...@gmail.com> AuthorDate: Tue Feb 6 09:20:02 2024 +0800 [ISSUE #7740] Optimize LocalFileOffsetStore --- .../consumer/store/LocalFileOffsetStore.java | 45 +++++++++++++++++++-- .../consumer/store/LocalFileOffsetStoreTest.java | 47 +++++++++++++++++++++- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index 074508c46b..38b0a5be35 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -138,10 +138,20 @@ public class LocalFileOffsetStore implements OffsetStore { @Override public void persistAll(Set<MessageQueue> mqs) { - if (null == mqs || mqs.isEmpty()) + if (null == mqs || mqs.isEmpty()) { return; + } + OffsetSerializeWrapper offsetSerializeWrapper = null; + try { + offsetSerializeWrapper = readLocalOffset(); + } catch (MQClientException e) { + log.error("readLocalOffset exception", e); + return; + } - OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); + if (offsetSerializeWrapper == null) { + offsetSerializeWrapper = new OffsetSerializeWrapper(); + } for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) { if (mqs.contains(entry.getKey())) { AtomicLong offset = new AtomicLong(entry.getValue().getOffset()); @@ -161,11 +171,40 @@ public class LocalFileOffsetStore implements OffsetStore { @Override public void persist(MessageQueue mq) { + if (mq == null) { + return; + } + ControllableOffset offset = this.offsetTable.get(mq); + if (offset != null) { + OffsetSerializeWrapper offsetSerializeWrapper = null; + try { + offsetSerializeWrapper = readLocalOffset(); + } catch (MQClientException e) { + log.error("readLocalOffset exception", e); + return; + } + if (offsetSerializeWrapper == null) { + offsetSerializeWrapper = new OffsetSerializeWrapper(); + } + offsetSerializeWrapper.getOffsetTable().put(mq, new AtomicLong(offset.getOffset())); + String jsonString = offsetSerializeWrapper.toJson(true); + if (jsonString != null) { + try { + MixAll.string2File(jsonString, this.storePath); + } catch (IOException e) { + log.error("persist consumer offset exception, " + this.storePath, e); + } + } + } } @Override public void removeOffset(MessageQueue mq) { - + if (mq != null) { + this.offsetTable.remove(mq); + log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq, + offsetTable.size()); + } } @Override diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java index c31c708dbb..2f88523bc1 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.consumer.store; import java.io.File; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -85,4 +86,48 @@ public class LocalFileOffsetStoreTest { assertThat(cloneOffsetTable.size()).isEqualTo(1); assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024); } -} \ No newline at end of file + + @Test + public void testPersist() throws Exception { + OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group); + + MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0); + offsetStore.updateOffset(messageQueue0, 1024, false); + offsetStore.persist(messageQueue0); + assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024); + + MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1); + assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1); + } + + @Test + public void testPersistAll() throws Exception { + OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group); + + MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0); + offsetStore.updateOffset(messageQueue0, 1024, false); + offsetStore.persistAll(new HashSet<MessageQueue>(Collections.singletonList(messageQueue0))); + assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024); + + MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1); + MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2); + offsetStore.updateOffset(messageQueue1, 1025, false); + offsetStore.updateOffset(messageQueue2, 1026, false); + offsetStore.persistAll(new HashSet<MessageQueue>(Arrays.asList(messageQueue1, messageQueue2))); + + assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024); + assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025); + assertThat(offsetStore.readOffset(messageQueue2, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026); + } + + @Test + public void testRemoveOffset() throws Exception { + OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0); + offsetStore.updateOffset(messageQueue, 1024, false); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024); + + offsetStore.removeOffset(messageQueue); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1); + } +}