This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d9c75ff1f9 [ISSUE #7872] Fix Tiered Store Query Message Async return different view each time (#7874) d9c75ff1f9 is described below commit d9c75ff1f94d0b25bd79532cc326eff5a994e47d Author: AYue <40812847+ayue...@users.noreply.github.com> AuthorDate: Tue Mar 26 15:12:38 2024 +0800 [ISSUE #7872] Fix Tiered Store Query Message Async return different view each time (#7874) Co-authored-by: ayue <ericyu0...@163.com> --- .../tieredstore/core/MessageStoreFetcherImpl.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index 2ffad2e3f4..5403ebdc31 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -394,8 +394,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { messageStore.getIndexService().queryAsync(topic, key, maxCount, begin, end); return future.thenCompose(indexItemList -> { - QueryMessageResult result = new QueryMessageResult(); - List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount); + List<CompletableFuture<SelectMappedBufferResult>> futureList = new ArrayList<>(maxCount); for (IndexItem indexItem : indexItemList) { if (topicId != indexItem.getTopicId()) { continue; @@ -405,17 +404,20 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { if (flatFile == null) { continue; } - CompletableFuture<Void> getMessageFuture = flatFile + CompletableFuture<SelectMappedBufferResult> getMessageFuture = flatFile .getCommitLogAsync(indexItem.getOffset(), indexItem.getSize()) - .thenAccept(messageBuffer -> result.addMessage( - new SelectMappedBufferResult( - indexItem.getOffset(), messageBuffer, indexItem.getSize(), null))); + .thenApply(messageBuffer -> new SelectMappedBufferResult( + indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)); futureList.add(getMessageFuture); if (futureList.size() >= maxCount) { break; } } - return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result); + return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> { + QueryMessageResult result = new QueryMessageResult(); + futureList.forEach(f -> f.thenAccept(result::addMessage)); + return result; + }); }).whenComplete((result, throwable) -> { if (result != null) { log.info("MessageFetcher#queryMessageAsync, " +