This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 8505482c0b fix: avoid memory overhead when there is large number of LMQ ConsumeQueue (#8956) 8505482c0b is described below commit 8505482c0b05b6dceb2e3a372bd8c9848c26c244 Author: Zhanhui Li <lizhan...@apache.org> AuthorDate: Wed Nov 20 14:55:30 2024 +0800 fix: avoid memory overhead when there is large number of LMQ ConsumeQueue (#8956) --- .../apache/rocketmq/store/queue/AbstractConsumeQueueStore.java | 6 +++++- .../org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java | 8 +++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java index dfce665d8f..ef693dc1e6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java @@ -39,7 +39,11 @@ public abstract class AbstractConsumeQueueStore implements ConsumeQueueStoreInte public AbstractConsumeQueueStore(DefaultMessageStore messageStore) { this.messageStore = messageStore; this.messageStoreConfig = messageStore.getMessageStoreConfig(); - this.consumeQueueTable = new ConcurrentHashMap<>(32); + if (messageStoreConfig.isEnableLmq()) { + this.consumeQueueTable = new ConcurrentHashMap<>(32_768); + } else { + this.consumeQueueTable = new ConcurrentHashMap<>(32); + } } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 67a0015743..0242ec2309 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -480,7 +480,13 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId) { ConcurrentMap<Integer, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic); if (null == map) { - ConcurrentMap<Integer, ConsumeQueueInterface> newMap = new ConcurrentHashMap<>(128); + ConcurrentMap<Integer, ConsumeQueueInterface> newMap; + if (MixAll.isLmq(topic)) { + // For LMQ, no need to over allocate internal hashtable + newMap = new ConcurrentHashMap<>(1, 1.0F); + } else { + newMap = new ConcurrentHashMap<>(8); + } ConcurrentMap<Integer, ConsumeQueueInterface> oldMap = this.consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap;