This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d9c75ff1f9 [ISSUE #7872] Fix Tiered Store Query Message Async return 
different view each time (#7874)
d9c75ff1f9 is described below

commit d9c75ff1f94d0b25bd79532cc326eff5a994e47d
Author: AYue <40812847+ayue...@users.noreply.github.com>
AuthorDate: Tue Mar 26 15:12:38 2024 +0800

    [ISSUE #7872] Fix Tiered Store Query Message Async return different view 
each time (#7874)
    
    Co-authored-by: ayue <ericyu0...@163.com>
---
 .../tieredstore/core/MessageStoreFetcherImpl.java        | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 2ffad2e3f4..5403ebdc31 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -394,8 +394,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
             messageStore.getIndexService().queryAsync(topic, key, maxCount, 
begin, end);
 
         return future.thenCompose(indexItemList -> {
-            QueryMessageResult result = new QueryMessageResult();
-            List<CompletableFuture<Void>> futureList = new 
ArrayList<>(maxCount);
+            List<CompletableFuture<SelectMappedBufferResult>> futureList = new 
ArrayList<>(maxCount);
             for (IndexItem indexItem : indexItemList) {
                 if (topicId != indexItem.getTopicId()) {
                     continue;
@@ -405,17 +404,20 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
                 if (flatFile == null) {
                     continue;
                 }
-                CompletableFuture<Void> getMessageFuture = flatFile
+                CompletableFuture<SelectMappedBufferResult> getMessageFuture = 
flatFile
                     .getCommitLogAsync(indexItem.getOffset(), 
indexItem.getSize())
-                    .thenAccept(messageBuffer -> result.addMessage(
-                        new SelectMappedBufferResult(
-                            indexItem.getOffset(), messageBuffer, 
indexItem.getSize(), null)));
+                    .thenApply(messageBuffer -> new SelectMappedBufferResult(
+                        indexItem.getOffset(), messageBuffer, 
indexItem.getSize(), null));
                 futureList.add(getMessageFuture);
                 if (futureList.size() >= maxCount) {
                     break;
                 }
             }
-            return CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).thenApply(v -> result);
+            return CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).thenApply(v -> {
+                QueryMessageResult result = new QueryMessageResult();
+                futureList.forEach(f -> f.thenAccept(result::addMessage));
+                return result;
+            });
         }).whenComplete((result, throwable) -> {
             if (result != null) {
                 log.info("MessageFetcher#queryMessageAsync, " +

Reply via email to