This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2089abd3be [ISSUE #9080] Fix tranfer logic when get large messages 
from cache in tiered storage (#9079)
2089abd3be is described below

commit 2089abd3be5c4f34d2a981caf32747a1e624f2ed
Author: yuz10 <845238...@qq.com>
AuthorDate: Thu Dec 26 14:49:00 2024 +0800

    [ISSUE #9080] Fix tranfer logic when get large messages from cache in 
tiered storage (#9079)
---
 .../apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java    | 5 +++++
 .../rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java    | 1 +
 2 files changed, 6 insertions(+)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 7f79dbcd98..e94185626a 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -56,6 +56,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
     private final String brokerName;
     private final MetadataStore metadataStore;
     private final MessageStoreConfig storeConfig;
+    private final org.apache.rocketmq.store.config.MessageStoreConfig 
messageStoreConfig;
     private final TieredMessageStore messageStore;
     private final IndexService indexService;
     private final FlatFileStore flatFileStore;
@@ -71,6 +72,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
         FlatFileStore flatFileStore, IndexService indexService) {
 
         this.storeConfig = storeConfig;
+        this.messageStoreConfig = messageStore.getMessageStoreConfig();
         this.brokerName = storeConfig.getBrokerName();
         this.flatFileStore = flatFileStore;
         this.messageStore = messageStore;
@@ -148,6 +150,9 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
             if (result.getMessageCount() == maxCount) {
                 break;
             }
+            if (result.getBufferTotalSize() >= 
messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
+                break;
+            }
         }
         result.setStatus(result.getMessageCount() > 0 ?
             GetMessageStatus.FOUND : GetMessageStatus.NO_MATCHED_MESSAGE);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
index 6b96076948..7a43e1ede8 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
@@ -106,6 +106,7 @@ public class MessageStoreDispatcherImplTest {
         Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
         Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
         Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
+        Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(new 
org.apache.rocketmq.store.config.MessageStoreConfig());
 
         // mock message
         ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();

Reply via email to