This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new 99bb415db4 [ISSUE #7740] Optimize LocalFileOffsetStore (#7745)
99bb415db4 is described below

commit 99bb415db46796a16205a6747f0627cc88178835
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Mon Jan 29 08:50:43 2024 +0800

    [ISSUE #7740] Optimize LocalFileOffsetStore (#7745)
    
    * Fix LocalFileOffsetStore persistAll and persist
    
    * Fix LocalFileOffsetStore removeOffset
    
    * Add test case
---
 .../consumer/store/LocalFileOffsetStore.java       | 45 ++++++++++++++++++++--
 .../consumer/store/LocalFileOffsetStoreTest.java   | 45 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index f949b75a81..0fbe8010db 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -131,10 +131,20 @@ public class LocalFileOffsetStore implements OffsetStore {
 
     @Override
     public void persistAll(Set<MessageQueue> mqs) {
-        if (null == mqs || mqs.isEmpty())
+        if (null == mqs || mqs.isEmpty()) {
             return;
+        }
+        OffsetSerializeWrapper offsetSerializeWrapper = null;
+        try {
+            offsetSerializeWrapper = readLocalOffset();
+        } catch (MQClientException e) {
+            log.error("readLocalOffset exception", e);
+            return;
+        }
 
-        OffsetSerializeWrapper offsetSerializeWrapper = new 
OffsetSerializeWrapper();
+        if (offsetSerializeWrapper == null) {
+            offsetSerializeWrapper = new OffsetSerializeWrapper();
+        }
         for (Map.Entry<MessageQueue, AtomicLong> entry : 
this.offsetTable.entrySet()) {
             if (mqs.contains(entry.getKey())) {
                 AtomicLong offset = entry.getValue();
@@ -154,11 +164,40 @@ public class LocalFileOffsetStore implements OffsetStore {
 
     @Override
     public void persist(MessageQueue mq) {
+        if (mq == null) {
+            return;
+        }
+        AtomicLong offset = this.offsetTable.get(mq);
+        if (offset != null) {
+            OffsetSerializeWrapper offsetSerializeWrapper = null;
+            try {
+                offsetSerializeWrapper = readLocalOffset();
+            } catch (MQClientException e) {
+                log.error("readLocalOffset exception", e);
+                return;
+            }
+            if (offsetSerializeWrapper == null) {
+                offsetSerializeWrapper = new OffsetSerializeWrapper();
+            }
+            offsetSerializeWrapper.getOffsetTable().put(mq, offset);
+            String jsonString = offsetSerializeWrapper.toJson(true);
+            if (jsonString != null) {
+                try {
+                    MixAll.string2File(jsonString, this.storePath);
+                } catch (IOException e) {
+                    log.error("persist consumer offset exception, " + 
this.storePath, e);
+                }
+            }
+        }
     }
 
     @Override
     public void removeOffset(MessageQueue mq) {
-
+        if (mq != null) {
+            this.offsetTable.remove(mq);
+            log.info("remove unnecessary messageQueue offset. group={}, mq={}, 
offsetTableSize={}", this.groupName, mq,
+                offsetTable.size());
+        }
     }
 
     @Override
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index a705b30fc3..e3f2cc070d 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.consumer.store;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -85,4 +86,48 @@ public class LocalFileOffsetStoreTest {
         assertThat(cloneOffsetTable.size()).isEqualTo(1);
         assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024);
     }
+
+    @Test
+    public void testPersist() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, 
group);
+
+        MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+        offsetStore.updateOffset(messageQueue0, 1024, false);
+        offsetStore.persist(messageQueue0);
+        assertThat(offsetStore.readOffset(messageQueue0, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+        MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+        assertThat(offsetStore.readOffset(messageQueue1, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+    }
+
+    @Test
+    public void testPersistAll() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, 
group);
+
+        MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+        offsetStore.updateOffset(messageQueue0, 1024, false);
+        offsetStore.persistAll(new 
HashSet<MessageQueue>(Collections.singletonList(messageQueue0)));
+        assertThat(offsetStore.readOffset(messageQueue0, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+        MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+        MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2);
+        offsetStore.updateOffset(messageQueue1, 1025, false);
+        offsetStore.updateOffset(messageQueue2, 1026, false);
+        offsetStore.persistAll(new 
HashSet<MessageQueue>(Arrays.asList(messageQueue1, messageQueue2)));
+
+        assertThat(offsetStore.readOffset(messageQueue0, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+        assertThat(offsetStore.readOffset(messageQueue1, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
+        assertThat(offsetStore.readOffset(messageQueue2, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026);
+    }
+
+    @Test
+    public void testRemoveOffset() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, 
group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
+        offsetStore.updateOffset(messageQueue, 1024, false);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.removeOffset(messageQueue);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+    }
 }
\ No newline at end of file

Reply via email to