This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 2089abd3be [ISSUE #9080] Fix tranfer logic when get large messages from cache in tiered storage (#9079) 2089abd3be is described below commit 2089abd3be5c4f34d2a981caf32747a1e624f2ed Author: yuz10 <845238...@qq.com> AuthorDate: Thu Dec 26 14:49:00 2024 +0800 [ISSUE #9080] Fix tranfer logic when get large messages from cache in tiered storage (#9079) --- .../apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java | 5 +++++ .../rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java | 1 + 2 files changed, 6 insertions(+) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index 7f79dbcd98..e94185626a 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -56,6 +56,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { private final String brokerName; private final MetadataStore metadataStore; private final MessageStoreConfig storeConfig; + private final org.apache.rocketmq.store.config.MessageStoreConfig messageStoreConfig; private final TieredMessageStore messageStore; private final IndexService indexService; private final FlatFileStore flatFileStore; @@ -71,6 +72,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { FlatFileStore flatFileStore, IndexService indexService) { this.storeConfig = storeConfig; + this.messageStoreConfig = messageStore.getMessageStoreConfig(); this.brokerName = storeConfig.getBrokerName(); this.flatFileStore = flatFileStore; this.messageStore = messageStore; @@ -148,6 +150,9 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { if (result.getMessageCount() == maxCount) { break; } + if (result.getBufferTotalSize() >= messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) { + break; + } } result.setStatus(result.getMessageCount() > 0 ? GetMessageStatus.FOUND : GetMessageStatus.NO_MATCHED_MESSAGE); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java index 6b96076948..7a43e1ede8 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java @@ -106,6 +106,7 @@ public class MessageStoreDispatcherImplTest { Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor); Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore); Mockito.when(messageStore.getIndexService()).thenReturn(indexService); + Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig()); // mock message ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();