This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8505482c0b fix: avoid memory overhead when there is large number of 
LMQ ConsumeQueue (#8956)
8505482c0b is described below

commit 8505482c0b05b6dceb2e3a372bd8c9848c26c244
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Wed Nov 20 14:55:30 2024 +0800

    fix: avoid memory overhead when there is large number of LMQ ConsumeQueue 
(#8956)
---
 .../apache/rocketmq/store/queue/AbstractConsumeQueueStore.java    | 6 +++++-
 .../org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java | 8 +++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
index dfce665d8f..ef693dc1e6 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
@@ -39,7 +39,11 @@ public abstract class AbstractConsumeQueueStore implements 
ConsumeQueueStoreInte
     public AbstractConsumeQueueStore(DefaultMessageStore messageStore) {
         this.messageStore = messageStore;
         this.messageStoreConfig = messageStore.getMessageStoreConfig();
-        this.consumeQueueTable = new ConcurrentHashMap<>(32);
+        if (messageStoreConfig.isEnableLmq()) {
+            this.consumeQueueTable = new ConcurrentHashMap<>(32_768);
+        } else {
+            this.consumeQueueTable = new ConcurrentHashMap<>(32);
+        }
     }
 
     @Override
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 67a0015743..0242ec2309 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -480,7 +480,13 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
     public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int 
queueId) {
         ConcurrentMap<Integer, ConsumeQueueInterface> map = 
this.consumeQueueTable.get(topic);
         if (null == map) {
-            ConcurrentMap<Integer, ConsumeQueueInterface> newMap = new 
ConcurrentHashMap<>(128);
+            ConcurrentMap<Integer, ConsumeQueueInterface> newMap;
+            if (MixAll.isLmq(topic)) {
+                // For LMQ, no need to over allocate internal hashtable
+                newMap = new ConcurrentHashMap<>(1, 1.0F);
+            } else {
+                newMap = new ConcurrentHashMap<>(8);
+            }
             ConcurrentMap<Integer, ConsumeQueueInterface> oldMap = 
this.consumeQueueTable.putIfAbsent(topic, newMap);
             if (oldMap != null) {
                 map = oldMap;

Reply via email to