This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d38558800c [ISSUE #7740] Optimize LocalFileOffsetStore
d38558800c is described below

commit d38558800c184ad34030388afec54715ad6784a8
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Tue Feb 6 09:20:02 2024 +0800

    [ISSUE #7740] Optimize LocalFileOffsetStore
---
 .../consumer/store/LocalFileOffsetStore.java       | 45 +++++++++++++++++++--
 .../consumer/store/LocalFileOffsetStoreTest.java   | 47 +++++++++++++++++++++-
 2 files changed, 88 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index 074508c46b..38b0a5be35 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -138,10 +138,20 @@ public class LocalFileOffsetStore implements OffsetStore {
 
     @Override
     public void persistAll(Set<MessageQueue> mqs) {
-        if (null == mqs || mqs.isEmpty())
+        if (null == mqs || mqs.isEmpty()) {
             return;
+        }
+        OffsetSerializeWrapper offsetSerializeWrapper = null;
+        try {
+            offsetSerializeWrapper = readLocalOffset();
+        } catch (MQClientException e) {
+            log.error("readLocalOffset exception", e);
+            return;
+        }
 
-        OffsetSerializeWrapper offsetSerializeWrapper = new 
OffsetSerializeWrapper();
+        if (offsetSerializeWrapper == null) {
+            offsetSerializeWrapper = new OffsetSerializeWrapper();
+        }
         for (Map.Entry<MessageQueue, ControllableOffset> entry : 
this.offsetTable.entrySet()) {
             if (mqs.contains(entry.getKey())) {
                 AtomicLong offset = new 
AtomicLong(entry.getValue().getOffset());
@@ -161,11 +171,40 @@ public class LocalFileOffsetStore implements OffsetStore {
 
     @Override
     public void persist(MessageQueue mq) {
+        if (mq == null) {
+            return;
+        }
+        ControllableOffset offset = this.offsetTable.get(mq);
+        if (offset != null) {
+            OffsetSerializeWrapper offsetSerializeWrapper = null;
+            try {
+                offsetSerializeWrapper = readLocalOffset();
+            } catch (MQClientException e) {
+                log.error("readLocalOffset exception", e);
+                return;
+            }
+            if (offsetSerializeWrapper == null) {
+                offsetSerializeWrapper = new OffsetSerializeWrapper();
+            }
+            offsetSerializeWrapper.getOffsetTable().put(mq, new 
AtomicLong(offset.getOffset()));
+            String jsonString = offsetSerializeWrapper.toJson(true);
+            if (jsonString != null) {
+                try {
+                    MixAll.string2File(jsonString, this.storePath);
+                } catch (IOException e) {
+                    log.error("persist consumer offset exception, " + 
this.storePath, e);
+                }
+            }
+        }
     }
 
     @Override
     public void removeOffset(MessageQueue mq) {
-
+        if (mq != null) {
+            this.offsetTable.remove(mq);
+            log.info("remove unnecessary messageQueue offset. group={}, mq={}, 
offsetTableSize={}", this.groupName, mq,
+                offsetTable.size());
+        }
     }
 
     @Override
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index c31c708dbb..2f88523bc1 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.consumer.store;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -85,4 +86,48 @@ public class LocalFileOffsetStoreTest {
         assertThat(cloneOffsetTable.size()).isEqualTo(1);
         assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024);
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testPersist() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, 
group);
+
+        MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+        offsetStore.updateOffset(messageQueue0, 1024, false);
+        offsetStore.persist(messageQueue0);
+        assertThat(offsetStore.readOffset(messageQueue0, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+        MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+        assertThat(offsetStore.readOffset(messageQueue1, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+    }
+
+    @Test
+    public void testPersistAll() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, 
group);
+
+        MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+        offsetStore.updateOffset(messageQueue0, 1024, false);
+        offsetStore.persistAll(new 
HashSet<MessageQueue>(Collections.singletonList(messageQueue0)));
+        assertThat(offsetStore.readOffset(messageQueue0, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+        MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+        MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2);
+        offsetStore.updateOffset(messageQueue1, 1025, false);
+        offsetStore.updateOffset(messageQueue2, 1026, false);
+        offsetStore.persistAll(new 
HashSet<MessageQueue>(Arrays.asList(messageQueue1, messageQueue2)));
+
+        assertThat(offsetStore.readOffset(messageQueue0, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+        assertThat(offsetStore.readOffset(messageQueue1, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
+        assertThat(offsetStore.readOffset(messageQueue2, 
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026);
+    }
+
+    @Test
+    public void testRemoveOffset() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, 
group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
+        offsetStore.updateOffset(messageQueue, 1024, false);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.removeOffset(messageQueue);
+        assertThat(offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+    }
+}

Reply via email to