This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new aec1055830 [ISSUE #7585] Support message filtering in rocketmq tiered 
storage (#7594)
aec1055830 is described below

commit aec1055830e78f7e710e32ebd467f9f7d208855d
Author: lizhimins <707364...@qq.com>
AuthorDate: Mon Dec 4 16:12:42 2023 +0800

    [ISSUE #7585] Support message filtering in rocketmq tiered storage (#7594)
---
 .../rocketmq/tieredstore/TieredMessageFetcher.java | 325 ++++++++++-----------
 .../rocketmq/tieredstore/TieredMessageStore.java   |   6 +-
 .../tieredstore/common/GetMessageResultExt.java    |  76 +++++
 .../tieredstore/common/SelectBufferResult.java     |  51 ++++
 ...Wrapper.java => SelectBufferResultWrapper.java} |  53 ++--
 .../common/TieredMessageStoreConfig.java           |   9 +
 .../metrics/TieredStoreMetricsManager.java         |   4 +-
 .../tieredstore/provider/TieredFileSegment.java    |   2 +-
 .../tieredstore/util/MessageBufferUtil.java        |  71 +++--
 .../tieredstore/TieredMessageFetcherTest.java      |   9 +-
 .../common/GetMessageResultExtTest.java            |  65 +++++
 .../tieredstore/common/SelectBufferResultTest.java |  37 +++
 .../tieredstore/util/MessageBufferUtilTest.java    |  19 +-
 13 files changed, 478 insertions(+), 249 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index f739773eb3..7b0c47c592 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -19,17 +19,14 @@ package org.apache.rocketmq.tieredstore;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
 import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -40,12 +37,13 @@ import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.tieredstore.common.GetMessageResultExt;
 import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
 import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
 import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
 import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
@@ -66,10 +64,10 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
 
     private final String brokerName;
-    private final TieredMessageStoreConfig storeConfig;
     private final TieredMetadataStore metadataStore;
+    private final TieredMessageStoreConfig storeConfig;
     private final TieredFlatFileManager flatFileManager;
-    private final Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
readAheadCache;
+    private final Cache<MessageCacheKey, SelectBufferResultWrapper> 
readAheadCache;
 
     public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) {
         this.storeConfig = storeConfig;
@@ -79,7 +77,7 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
         this.readAheadCache = this.initCache(storeConfig);
     }
 
-    private Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
initCache(TieredMessageStoreConfig storeConfig) {
+    private Cache<MessageCacheKey, SelectBufferResultWrapper> 
initCache(TieredMessageStoreConfig storeConfig) {
         long memoryMaxSize =
             (long) (Runtime.getRuntime().maxMemory() * 
storeConfig.getReadAheadCacheSizeThresholdRate());
 
@@ -88,60 +86,35 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             .expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), 
TimeUnit.MILLISECONDS)
             .maximumWeight(memoryMaxSize)
             // Using the buffer size of messages to calculate memory usage
-            .weigher((MessageCacheKey key, SelectMappedBufferResultWrapper 
msg) -> msg.getDuplicateResult().getSize())
+            .weigher((MessageCacheKey key, SelectBufferResultWrapper msg) -> 
msg.getBufferSize())
             .recordStats()
             .build();
     }
 
-    protected SelectMappedBufferResultWrapper 
putMessageToCache(CompositeFlatFile flatFile,
-        long queueOffset, SelectMappedBufferResult result, long minOffset, 
long maxOffset, int size) {
-
-        return putMessageToCache(flatFile, queueOffset, result, minOffset, 
maxOffset, size, false);
-    }
-
-    protected SelectMappedBufferResultWrapper 
putMessageToCache(CompositeFlatFile flatFile,
-        long queueOffset, SelectMappedBufferResult result, long minOffset, 
long maxOffset, int size, boolean used) {
-
-        SelectMappedBufferResultWrapper wrapper =
-            new SelectMappedBufferResultWrapper(result, queueOffset, 
minOffset, maxOffset, size);
-        if (used) {
-            wrapper.addAccessCount();
-        }
-        readAheadCache.put(new MessageCacheKey(flatFile, queueOffset), 
wrapper);
-        return wrapper;
-    }
-
-    // Visible for metrics monitor
-    public Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
getMessageCache() {
+    @VisibleForTesting
+    public Cache<MessageCacheKey, SelectBufferResultWrapper> getMessageCache() 
{
         return readAheadCache;
     }
 
-    // Waiting for the request in transit to complete
-    protected CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
-        CompositeQueueFlatFile flatFile, String group, long queueOffset, int 
maxCount) {
-
-        return getMessageFromCacheAsync(flatFile, group, queueOffset, 
maxCount, true);
+    protected void putMessageToCache(CompositeFlatFile flatFile, 
SelectBufferResultWrapper result) {
+        readAheadCache.put(new MessageCacheKey(flatFile, result.getOffset()), 
result);
     }
 
-    @Nullable
-    protected SelectMappedBufferResultWrapper 
getMessageFromCache(CompositeFlatFile flatFile, long queueOffset) {
-        MessageCacheKey cacheKey = new MessageCacheKey(flatFile, queueOffset);
-        return readAheadCache.getIfPresent(cacheKey);
+    protected SelectBufferResultWrapper getMessageFromCache(CompositeFlatFile 
flatFile, long offset) {
+        return readAheadCache.getIfPresent(new MessageCacheKey(flatFile, 
offset));
     }
 
-    protected void recordCacheAccess(CompositeFlatFile flatFile, String group, 
long queueOffset,
-        List<SelectMappedBufferResultWrapper> resultWrapperList) {
-        if (resultWrapperList.size() > 0) {
-            queueOffset = resultWrapperList.get(resultWrapperList.size() - 
1).getCurOffset();
+    protected void recordCacheAccess(CompositeFlatFile flatFile,
+        String group, long offset, List<SelectBufferResultWrapper> 
resultWrapperList) {
+        if (!resultWrapperList.isEmpty()) {
+            offset = resultWrapperList.get(resultWrapperList.size() - 
1).getOffset();
         }
-        flatFile.recordGroupAccess(group, queueOffset);
-        for (SelectMappedBufferResultWrapper wrapper : resultWrapperList) {
-            wrapper.addAccessCount();
-            if (wrapper.getAccessCount() >= flatFile.getActiveGroupCount()) {
-                MessageCacheKey cacheKey = new MessageCacheKey(flatFile, 
wrapper.getCurOffset());
-                readAheadCache.invalidate(cacheKey);
+        flatFile.recordGroupAccess(group, offset);
+        resultWrapperList.forEach(wrapper -> {
+            if (wrapper.incrementAndGet() >= flatFile.getActiveGroupCount()) {
+                readAheadCache.invalidate(new MessageCacheKey(flatFile, 
wrapper.getOffset()));
             }
-        }
+        });
     }
 
     private void prefetchMessage(CompositeQueueFlatFile flatFile, String 
group, int maxCount, long nextBeginOffset) {
@@ -149,7 +122,6 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             return;
         }
 
-        MessageQueue mq = flatFile.getMessageQueue();
         // make sure there is only one request per group and request range
         int prefetchBatchSize = Math.min(maxCount * 
flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold());
         InFlightRequestFuture inflightRequest = 
flatFile.getInflightRequest(group, nextBeginOffset, prefetchBatchSize);
@@ -166,13 +138,8 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             long maxOffsetOfLastRequest = 
inflightRequest.getLastFuture().join();
             boolean lastRequestIsExpired = getMessageFromCache(flatFile, 
nextBeginOffset) == null;
 
-            // if message fetch by last request is expired, we need to 
prefetch the message from tiered store
-            int cacheRemainCount = (int) (maxOffsetOfLastRequest - 
nextBeginOffset);
-            LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={}, 
nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={}, 
cacheRemainCount={}",
-                group, nextBeginOffset, maxOffsetOfLastRequest, 
lastRequestIsExpired, cacheRemainCount);
-
-            if (lastRequestIsExpired
-                || maxOffsetOfLastRequest != -1L && nextBeginOffset >= 
inflightRequest.getStartOffset()) {
+            if (lastRequestIsExpired ||
+                maxOffsetOfLastRequest != -1L && nextBeginOffset >= 
inflightRequest.getStartOffset()) {
 
                 long queueOffset;
                 if (lastRequestIsExpired) {
@@ -196,12 +163,12 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
                 long nextQueueOffset = queueOffset;
                 if (flag == 1) {
                     int firstBatchSize = factor % 
storeConfig.getReadAheadBatchSizeFactorThreshold() * maxCount;
-                    CompletableFuture<Long> future = 
prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset, firstBatchSize);
+                    CompletableFuture<Long> future = 
prefetchMessageThenPutToCache(flatFile, nextQueueOffset, firstBatchSize);
                     futureList.add(Pair.of(firstBatchSize, future));
                     nextQueueOffset += firstBatchSize;
                 }
                 for (long i = 0; i < concurrency - flag; i++) {
-                    CompletableFuture<Long> future = 
prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset + i * 
requestBatchSize, requestBatchSize);
+                    CompletableFuture<Long> future = 
prefetchMessageThenPutToCache(flatFile, nextQueueOffset + i * requestBatchSize, 
requestBatchSize);
                     futureList.add(Pair.of(requestBatchSize, future));
                 }
                 flatFile.putInflightRequest(group, queueOffset, maxCount * 
factor, futureList);
@@ -211,52 +178,52 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
         }
     }
 
-    private CompletableFuture<Long> 
prefetchMessageThenPutToCache(CompositeQueueFlatFile flatFile, MessageQueue mq,
-        long queueOffset, int batchSize) {
+    private CompletableFuture<Long> prefetchMessageThenPutToCache(
+        CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) {
+
+        MessageQueue mq = flatFile.getMessageQueue();
         return getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize)
-            .thenApplyAsync(result -> {
-                if (result.getStatus() != GetMessageStatus.FOUND) {
-                    
LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed: 
topic: {}, queue: {}, queue offset: {}, batch size: {}, result: {}",
-                        mq.getTopic(), mq.getQueueId(), queueOffset, 
batchSize, result.getStatus());
-                    return -1L;
-                }
-                // put message into cache
-                List<Long> offsetList = result.getMessageQueueOffset();
-                List<SelectMappedBufferResult> msgList = 
result.getMessageMapedList();
-                if (offsetList.size() != msgList.size()) {
-                    
LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, 
result is illegal: topic: {}, queue: {}, queue offset: {}, batch size: {}, 
offsetList size: {}, msgList size: {}",
-                        mq.getTopic(), mq.getQueueId(), queueOffset, 
batchSize, offsetList.size(), msgList.size());
+            .thenApply(result -> {
+                if (result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE 
||
+                    result.getStatus() == 
GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
                     return -1L;
                 }
-                if (offsetList.isEmpty()) {
-                    
LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, 
result is FOUND but msgList is empty: topic: {}, queue: {}, queue offset: {}, 
batch size: {}",
-                        mq.getTopic(), mq.getQueueId(), queueOffset, 
batchSize);
+                if (result.getStatus() != GetMessageStatus.FOUND) {
+                    LOGGER.warn("MessageFetcher prefetch message then put to 
cache failed, result: {}, " +
+                            "topic: {}, queue: {}, queue offset: {}, batch 
size: {}",
+                        result.getStatus(), mq.getTopic(), mq.getQueueId(), 
queueOffset, batchSize);
                     return -1L;
                 }
-                Long minOffset = offsetList.get(0);
-                Long maxOffset = offsetList.get(offsetList.size() - 1);
-                int size = offsetList.size();
-                for (int n = 0; n < offsetList.size(); n++) {
-                    putMessageToCache(flatFile, offsetList.get(n), 
msgList.get(n), minOffset, maxOffset, size);
-                }
-                if (size != batchSize || maxOffset != queueOffset + batchSize 
- 1) {
-                    
LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: size not match: 
except: {}, actual: {}, queue offset: {}, min offset: {}, except offset: {}, 
max offset: {}",
-                        batchSize, size, queueOffset, minOffset, queueOffset + 
batchSize - 1, maxOffset);
+                try {
+                    List<Long> offsetList = result.getMessageQueueOffset();
+                    List<Long> tagCodeList = result.getTagCodeList();
+                    List<SelectMappedBufferResult> msgList = 
result.getMessageMapedList();
+                    for (int i = 0; i < offsetList.size(); i++) {
+                        SelectMappedBufferResult msg = msgList.get(i);
+                        SelectBufferResultWrapper bufferResult = new 
SelectBufferResultWrapper(
+                            msg, offsetList.get(i), tagCodeList.get(i), false);
+                        this.putMessageToCache(flatFile, bufferResult);
+                    }
+                    return offsetList.get(offsetList.size() - 1);
+                } catch (Exception e) {
+                    LOGGER.error("MessageFetcher prefetch message then put to 
cache failed, " +
+                            "topic: {}, queue: {}, queue offset: {}, batch 
size: {}",
+                        mq.getTopic(), mq.getQueueId(), queueOffset, 
batchSize, e);
                 }
-                return maxOffset;
-            }, TieredStoreExecutor.fetchDataExecutor);
+                return -1L;
+            });
     }
 
-    public CompletableFuture<GetMessageResult> 
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
+    public CompletableFuture<GetMessageResultExt> 
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
         String group, long queueOffset, int maxCount, boolean 
waitInflightRequest) {
 
         MessageQueue mq = flatFile.getMessageQueue();
 
         long lastGetOffset = queueOffset - 1;
-        List<SelectMappedBufferResultWrapper> resultWrapperList = new 
ArrayList<>(maxCount);
+        List<SelectBufferResultWrapper> resultWrapperList = new 
ArrayList<>(maxCount);
         for (int i = 0; i < maxCount; i++) {
             lastGetOffset++;
-            SelectMappedBufferResultWrapper wrapper = 
getMessageFromCache(flatFile, lastGetOffset);
+            SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, 
lastGetOffset);
             if (wrapper == null) {
                 lastGetOffset--;
                 break;
@@ -281,19 +248,19 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
                 flatFile.getInflightRequest(group, queueOffset, 
maxCount).getFuture(queueOffset);
             if (!future.isDone()) {
                 Stopwatch stopwatch = Stopwatch.createStarted();
-                // to prevent starvation issues, only allow waiting for 
inflight request once
-                return future.thenCompose(v -> {
+                // to prevent starvation issues, only allow waiting for 
processing request once
+                return future.thenComposeAsync(v -> {
                     LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: 
wait for response cost: {}ms",
                         stopwatch.elapsed(TimeUnit.MILLISECONDS));
                     return getMessageFromCacheAsync(flatFile, group, 
queueOffset, maxCount, false);
-                });
+                }, TieredStoreExecutor.fetchDataExecutor);
             }
         }
 
         // try to get message from cache again when prefetch request is done
         for (int i = 0; i < maxCount - resultWrapperList.size(); i++) {
             lastGetOffset++;
-            SelectMappedBufferResultWrapper wrapper = 
getMessageFromCache(flatFile, lastGetOffset);
+            SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, 
lastGetOffset);
             if (wrapper == null) {
                 lastGetOffset--;
                 break;
@@ -303,74 +270,94 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
 
         recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
 
-        // if cache hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
-        if (!resultWrapperList.isEmpty()) {
-            LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: 
" +
-                    "topic: {}, queue: {}, queue offset: {}, max message num: 
{}, cache hit num: {}",
-                mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
-            prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+        if (resultWrapperList.isEmpty()) {
+            // If cache miss, pull messages immediately
+            LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}",
+                group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+        } else {
+            // If cache hit, return buffer result immediately and 
asynchronously prefetch messages
+            LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+                group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
 
-            GetMessageResult result = new GetMessageResult();
+            GetMessageResultExt result = new GetMessageResultExt();
             result.setStatus(GetMessageStatus.FOUND);
             result.setMinOffset(flatFile.getConsumeQueueMinOffset());
             result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
             result.setNextBeginOffset(queueOffset + resultWrapperList.size());
-            resultWrapperList.forEach(wrapper -> 
result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+            resultWrapperList.forEach(wrapper -> result.addMessageExt(
+                wrapper.getDuplicateResult(), wrapper.getOffset(), 
wrapper.getTagCode()));
+
+            if (lastGetOffset < result.getMaxOffset()) {
+                this.prefetchMessage(flatFile, group, maxCount, lastGetOffset 
+ 1);
+            }
             return CompletableFuture.completedFuture(result);
         }
 
-        // if cache is miss, immediately pull messages
-        LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
-                "topic: {}, queue: {}, queue offset: {}, max message num: {}",
-            mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
-        CompletableFuture<GetMessageResult> resultFuture;
+        CompletableFuture<GetMessageResultExt> resultFuture;
         synchronized (flatFile) {
             int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
             resultFuture = getMessageFromTieredStoreAsync(flatFile, 
queueOffset, batchSize)
-                .thenApplyAsync(result -> {
+                .thenApply(result -> {
                     if (result.getStatus() != GetMessageStatus.FOUND) {
                         return result;
                     }
-                    GetMessageResult newResult = new GetMessageResult();
-                    newResult.setStatus(GetMessageStatus.FOUND);
-                    
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
-                    
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 
+                    GetMessageResultExt newResult = new GetMessageResultExt();
                     List<Long> offsetList = result.getMessageQueueOffset();
+                    List<Long> tagCodeList = result.getTagCodeList();
                     List<SelectMappedBufferResult> msgList = 
result.getMessageMapedList();
-                    Long minOffset = offsetList.get(0);
-                    Long maxOffset = offsetList.get(offsetList.size() - 1);
-                    int size = offsetList.size();
+
                     for (int i = 0; i < offsetList.size(); i++) {
-                        Long offset = offsetList.get(i);
                         SelectMappedBufferResult msg = msgList.get(i);
-                        // put message into cache
-                        SelectMappedBufferResultWrapper resultWrapper = 
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
-                        // try to meet maxCount
+                        SelectBufferResultWrapper bufferResult = new 
SelectBufferResultWrapper(
+                            msg, offsetList.get(i), tagCodeList.get(i), true);
+                        this.putMessageToCache(flatFile, bufferResult);
                         if (newResult.getMessageMapedList().size() < maxCount) 
{
-                            
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+                            newResult.addMessageExt(msg, offsetList.get(i), 
tagCodeList.get(i));
                         }
                     }
+
+                    newResult.setStatus(GetMessageStatus.FOUND);
+                    
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
+                    
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
                     newResult.setNextBeginOffset(queueOffset + 
newResult.getMessageMapedList().size());
                     return newResult;
-                }, TieredStoreExecutor.fetchDataExecutor);
+                });
 
             List<Pair<Integer, CompletableFuture<Long>>> futureList = new 
ArrayList<>();
             CompletableFuture<Long> inflightRequestFuture = 
resultFuture.thenApply(result ->
-                result.getStatus() == GetMessageStatus.FOUND ? 
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : 
-1L);
+                result.getStatus() == GetMessageStatus.FOUND ?
+                    
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : 
-1L);
             futureList.add(Pair.of(batchSize, inflightRequestFuture));
             flatFile.putInflightRequest(group, queueOffset, batchSize, 
futureList);
         }
         return resultFuture;
     }
 
-    public CompletableFuture<GetMessageResult> 
getMessageFromTieredStoreAsync(CompositeQueueFlatFile flatFile,
-        long queueOffset, int batchSize) {
+    public CompletableFuture<GetMessageResultExt> 
getMessageFromTieredStoreAsync(
+        CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) {
 
-        GetMessageResult result = new GetMessageResult();
+        GetMessageResultExt result = new GetMessageResultExt();
         result.setMinOffset(flatFile.getConsumeQueueMinOffset());
         result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+
+        if (queueOffset < result.getMaxOffset()) {
+            batchSize = Math.min(batchSize, (int) 
Math.min(result.getMaxOffset() - queueOffset, Integer.MAX_VALUE));
+        } else if (queueOffset == result.getMaxOffset()) {
+            result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
+            result.setNextBeginOffset(queueOffset);
+            return CompletableFuture.completedFuture(result);
+        } else if (queueOffset > result.getMaxOffset()) {
+            result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
+            result.setNextBeginOffset(result.getMaxOffset());
+            return CompletableFuture.completedFuture(result);
+        }
+
+        LOGGER.info("MessageFetcher#getMessageFromTieredStoreAsync, " +
+                "topic: {}, queueId: {}, broker offset: {}-{}, offset: {}, 
expect: {}",
+            flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId(),
+            result.getMinOffset(), result.getMaxOffset(), queueOffset, 
batchSize);
+
         CompletableFuture<ByteBuffer> readConsumeQueueFuture;
         try {
             readConsumeQueueFuture = 
flatFile.getConsumeQueueAsync(queueOffset, batchSize);
@@ -389,66 +376,56 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             }
         }
 
-        CompletableFuture<ByteBuffer> readCommitLogFuture = 
readConsumeQueueFuture.thenComposeAsync(cqBuffer -> {
+        CompletableFuture<ByteBuffer> readCommitLogFuture = 
readConsumeQueueFuture.thenCompose(cqBuffer -> {
             long firstCommitLogOffset = 
CQItemBufferUtil.getCommitLogOffset(cqBuffer);
             cqBuffer.position(cqBuffer.remaining() - 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
             long lastCommitLogOffset = 
CQItemBufferUtil.getCommitLogOffset(cqBuffer);
             if (lastCommitLogOffset < firstCommitLogOffset) {
-                MessageQueue mq = flatFile.getMessageQueue();
-                
LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: message is 
not in order, try to fetch data in next store, topic: {}, queueId: {}, batch 
size: {}, queue offset {}",
-                    mq.getTopic(), mq.getQueueId(), batchSize, queueOffset);
-                throw new 
TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, "message is not in 
order");
+                LOGGER.error("MessageFetcher#getMessageFromTieredStoreAsync, " 
+
+                        "last offset is smaller than first offset, " +
+                        "topic: {} queueId: {}, offset: {}, firstOffset: {}, 
lastOffset: {}",
+                    flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId(), queueOffset,
+                    firstCommitLogOffset, lastCommitLogOffset);
+                return 
CompletableFuture.completedFuture(ByteBuffer.allocate(0));
             }
-            long length = lastCommitLogOffset - firstCommitLogOffset + 
CQItemBufferUtil.getSize(cqBuffer);
 
-            // prevent OOM
-            long originLength = length;
-            while (cqBuffer.limit() > 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE && length > 
storeConfig.getReadAheadMessageSizeThreshold()) {
+            // Get the total size of the data by reducing the length limit of 
cq to prevent OOM
+            long length = lastCommitLogOffset - firstCommitLogOffset + 
CQItemBufferUtil.getSize(cqBuffer);
+            while (cqBuffer.limit() > 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE &&
+                length > storeConfig.getReadAheadMessageSizeThreshold()) {
                 cqBuffer.limit(cqBuffer.position());
                 cqBuffer.position(cqBuffer.limit() - 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
-                length = CQItemBufferUtil.getCommitLogOffset(cqBuffer) - 
firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer);
-            }
-
-            if (originLength != length) {
-                MessageQueue mq = flatFile.getMessageQueue();
-                
LOGGER.info("TieredMessageFetcher#getMessageFromTieredStoreAsync: msg data is 
too large, topic: {}, queueId: {}, batch size: {}, fix it from {} to {}",
-                    mq.getTopic(), mq.getQueueId(), batchSize, originLength, 
length);
+                length = CQItemBufferUtil.getCommitLogOffset(cqBuffer)
+                    - firstCommitLogOffset + 
CQItemBufferUtil.getSize(cqBuffer);
             }
 
             return flatFile.getCommitLogAsync(firstCommitLogOffset, (int) 
length);
-        }, TieredStoreExecutor.fetchDataExecutor);
+        });
 
-        return readConsumeQueueFuture.thenCombineAsync(readCommitLogFuture, 
(cqBuffer, msgBuffer) -> {
-            List<Pair<Integer, Integer>> msgList = 
MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
-            if (!msgList.isEmpty()) {
-                int requestSize = cqBuffer.remaining() / 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
+        int finalBatchSize = batchSize;
+        return readConsumeQueueFuture.thenCombine(readCommitLogFuture, 
(cqBuffer, msgBuffer) -> {
+            List<SelectBufferResult> bufferList = 
MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+            int requestSize = cqBuffer.remaining() / 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
+            if (bufferList.isEmpty()) {
+                result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE);
+                result.setNextBeginOffset(queueOffset + requestSize);
+            } else {
                 result.setStatus(GetMessageStatus.FOUND);
-                result.setNextBeginOffset(queueOffset + msgList.size());
-                msgList.forEach(pair -> {
-                    msgBuffer.position(pair.getLeft());
-                    ByteBuffer slice = msgBuffer.slice();
-                    slice.limit(pair.getRight());
-                    result.addMessage(new 
SelectMappedBufferResult(pair.getLeft(), slice, pair.getRight(), null), 
MessageBufferUtil.getQueueOffset(slice));
-                });
-                if (requestSize != msgList.size()) {
-                    Set<Long> requestOffsetSet = new HashSet<>();
-                    for (int i = 0; i < requestSize; i++) {
-                        requestOffsetSet.add(queueOffset + i);
-                    }
-                    
LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split 
message buffer failed, batch size: {}, request message count: {}, actual 
message count: {}, these messages may lost: {}", batchSize, requestSize, 
msgList.size(), Sets.difference(requestOffsetSet, 
Sets.newHashSet(result.getMessageQueueOffset())));
-                } else if (requestSize != batchSize) {
-                    
LOGGER.debug("TieredMessageFetcher#getMessageFromTieredStoreAsync: message 
count does not meet batch size, maybe dispatch delay: batch size: {}, request 
message count: {}", batchSize, requestSize);
+                result.setNextBeginOffset(queueOffset + requestSize);
+
+                for (SelectBufferResult bufferResult : bufferList) {
+                    ByteBuffer slice = bufferResult.getByteBuffer().slice();
+                    slice.limit(bufferResult.getSize());
+                    SelectMappedBufferResult msg = new 
SelectMappedBufferResult(bufferResult.getStartOffset(),
+                        bufferResult.getByteBuffer(), bufferResult.getSize(), 
null);
+                    result.addMessageExt(msg, 
MessageBufferUtil.getQueueOffset(slice), bufferResult.getTagCode());
                 }
-                return result;
             }
-            long nextBeginOffset = queueOffset + cqBuffer.remaining() / 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
-            LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: 
split message buffer failed, consume queue buffer size: {}, message buffer 
size: {}, change offset from {} to {}", cqBuffer.remaining(), 
msgBuffer.remaining(), queueOffset, nextBeginOffset);
-            result.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
-            result.setNextBeginOffset(nextBeginOffset);
             return result;
-        }, TieredStoreExecutor.fetchDataExecutor).exceptionally(e -> {
+        }).exceptionally(e -> {
             MessageQueue mq = flatFile.getMessageQueue();
-            LOGGER.warn("TieredMessageFetcher#getMessageFromTieredStoreAsync: 
get message failed: topic: {} queueId: {}", mq.getTopic(), mq.getQueueId(), e);
+            LOGGER.warn("MessageFetcher#getMessageFromTieredStoreAsync failed, 
" +
+                "topic: {} queueId: {}, offset: {}, batchSize: {}", 
mq.getTopic(), mq.getQueueId(), queueOffset, finalBatchSize, e);
             result.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
             result.setNextBeginOffset(queueOffset);
             return result;
@@ -498,7 +475,8 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             return CompletableFuture.completedFuture(result);
         }
 
-        return getMessageFromCacheAsync(flatFile, group, queueOffset, 
maxCount);
+        return getMessageFromCacheAsync(flatFile, group, queueOffset, 
maxCount, true)
+            .thenApply(messageResultExt -> 
messageResultExt.doFilterMessage(messageFilter));
     }
 
     @Override
@@ -546,7 +524,7 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             return flatFile.getOffsetInConsumeQueueByTime(timestamp, type);
         } catch (Exception e) {
             LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: " +
-                "get offset in queue by time failed: topic: {}, queue: {}, 
timestamp: {}, type: {}",
+                    "get offset in queue by time failed: topic: {}, queue: {}, 
timestamp: {}, type: {}",
                 topic, queueId, timestamp, type, e);
         }
         return -1L;
@@ -598,7 +576,8 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             return CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).thenApply(v -> result);
         }).whenComplete((result, throwable) -> {
             if (result != null) {
-                LOGGER.info("MessageFetcher#queryMessageAsync, query result: 
{}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
+                LOGGER.info("MessageFetcher#queryMessageAsync, " +
+                        "query result: {}, topic: {}, topicId: {}, key: {}, 
maxCount: {}, timestamp: {}-{}",
                     result.getMessageBufferList().size(), topic, topicId, key, 
maxCount, begin, end);
             }
         });
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index edaa5d19f6..015c27efae 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -213,8 +213,10 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 // so there is no need to update the maximum offset to the 
local cq offset here,
                 // otherwise it will cause repeated consumption after next 
begin offset over commit offset.
 
-                logger.trace("GetMessageAsync result, group: {}, topic: {}, 
queueId: {}, offset: {}, count:{}, {}",
-                    group, topic, queueId, offset, maxMsgNums, result);
+                if (storeConfig.isRecordGetMessageResult()) {
+                    logger.info("GetMessageAsync result, {}, group: {}, topic: 
{}, queueId: {}, offset: {}, count:{}",
+                        result, group, topic, queueId, offset, maxMsgNums);
+                }
 
                 return result;
             }).exceptionally(e -> {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
new file mode 100644
index 0000000000..52462b5dc5
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+
+public class GetMessageResultExt extends GetMessageResult {
+
+    private final List<Long> tagCodeList;
+
+    public GetMessageResultExt() {
+        this.tagCodeList = new ArrayList<>();
+    }
+
+    public void addMessageExt(SelectMappedBufferResult bufferResult, long 
queueOffset, long tagCode) {
+        super.addMessage(bufferResult, queueOffset);
+        this.tagCodeList.add(tagCode);
+    }
+
+    public List<Long> getTagCodeList() {
+        return tagCodeList;
+    }
+
+    public GetMessageResult doFilterMessage(MessageFilter messageFilter) {
+        if (GetMessageStatus.FOUND != super.getStatus() || messageFilter == 
null) {
+            return this;
+        }
+
+        GetMessageResult result = new GetMessageResult();
+        result.setStatus(GetMessageStatus.FOUND);
+        result.setMinOffset(this.getMinOffset());
+        result.setMaxOffset(this.getMaxOffset());
+        result.setNextBeginOffset(this.getNextBeginOffset());
+
+        for (int i = 0; i < this.getMessageMapedList().size(); i++) {
+            if 
(!messageFilter.isMatchedByConsumeQueue(this.tagCodeList.get(i), null)) {
+                continue;
+            }
+
+            SelectMappedBufferResult bufferResult = 
this.getMessageMapedList().get(i);
+            if 
(!messageFilter.isMatchedByCommitLog(bufferResult.getByteBuffer().slice(), 
null)) {
+                continue;
+            }
+
+            result.addMessage(new 
SelectMappedBufferResult(bufferResult.getStartOffset(),
+                    bufferResult.getByteBuffer(), bufferResult.getSize(), 
null),
+                
MessageBufferUtil.getQueueOffset(bufferResult.getByteBuffer()));
+        }
+
+        if (result.getBufferTotalSize() == 0) {
+            result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE);
+        }
+        return result;
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
new file mode 100644
index 0000000000..d265ed0fc4
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+
+public class SelectBufferResult {
+
+    private final ByteBuffer byteBuffer;
+    private final long startOffset;
+    private final int size;
+    private final long tagCode;
+
+    public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int 
size, long tagCode) {
+        this.startOffset = startOffset;
+        this.byteBuffer = byteBuffer;
+        this.size = size;
+        this.tagCode = tagCode;
+    }
+
+    public ByteBuffer getByteBuffer() {
+        return byteBuffer;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public long getTagCode() {
+        return tagCode;
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
similarity index 55%
rename from 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java
rename to 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
index af0785f712..4f9f00a074 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java
@@ -16,32 +16,21 @@
  */
 package org.apache.rocketmq.tieredstore.common;
 
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
-public class SelectMappedBufferResultWrapper {
+public class SelectBufferResultWrapper {
 
     private final SelectMappedBufferResult result;
-    private final LongAdder accessCount;
-
-    private final long curOffset;
-    private final long minOffset;
-    private final long maxOffset;
-    private final long size;
-
-    public SelectMappedBufferResultWrapper(
-        SelectMappedBufferResult result, long curOffset, long minOffset, long 
maxOffset, long size) {
+    private final long offset;
+    private final long tagCode;
+    private final AtomicInteger accessCount;
 
+    public SelectBufferResultWrapper(SelectMappedBufferResult result, long 
offset, long tagCode, boolean used) {
         this.result = result;
-        this.accessCount = new LongAdder();
-        this.curOffset = curOffset;
-        this.minOffset = minOffset;
-        this.maxOffset = maxOffset;
-        this.size = size;
-    }
-
-    public SelectMappedBufferResult getResult() {
-        return result;
+        this.offset = offset;
+        this.tagCode = tagCode;
+        this.accessCount = new AtomicInteger(used ? 1 : 0);
     }
 
     public SelectMappedBufferResult getDuplicateResult() {
@@ -53,27 +42,23 @@ public class SelectMappedBufferResultWrapper {
             result.getMappedFile());
     }
 
-    public long getCurOffset() {
-        return curOffset;
-    }
-
-    public long getMinOffset() {
-        return minOffset;
+    public long getOffset() {
+        return offset;
     }
 
-    public long getMaxOffset() {
-        return maxOffset;
+    public int getBufferSize() {
+        return this.result.getSize();
     }
 
-    public long getSize() {
-        return size;
+    public long getTagCode() {
+        return tagCode;
     }
 
-    public void addAccessCount() {
-        accessCount.increment();
+    public int incrementAndGet() {
+        return accessCount.incrementAndGet();
     }
 
-    public long getAccessCount() {
-        return accessCount.sum();
+    public int getAccessCount() {
+        return accessCount.get();
     }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
index 595db6b865..b0750e5509 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
@@ -82,6 +82,7 @@ public class TieredMessageStoreConfig {
 
     private String storePathRootDir = System.getProperty("user.home") + 
File.separator + "store";
     private boolean messageIndexEnable = true;
+    private boolean recordGetMessageResult = false;
 
     // CommitLog file size, default is 1G
     private long tieredStoreCommitLogMaxSize = 1024 * 1024 * 1024;
@@ -182,6 +183,14 @@ public class TieredMessageStoreConfig {
         this.messageIndexEnable = messageIndexEnable;
     }
 
+    public boolean isRecordGetMessageResult() {
+        return recordGetMessageResult;
+    }
+
+    public void setRecordGetMessageResult(boolean recordGetMessageResult) {
+        this.recordGetMessageResult = recordGetMessageResult;
+    }
+
     public long getTieredStoreCommitLogMaxSize() {
         return tieredStoreCommitLogMaxSize;
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index d8a07f0a75..2b9fc59d82 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -46,7 +46,7 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
 import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
@@ -265,7 +265,7 @@ public class TieredStoreMetricsManager {
             .setUnit("bytes")
             .ofLongs()
             .buildWithCallback(measurement -> {
-                Optional<Policy.Eviction<MessageCacheKey, 
SelectMappedBufferResultWrapper>> eviction = 
fetcher.getMessageCache().policy().eviction();
+                Optional<Policy.Eviction<MessageCacheKey, 
SelectBufferResultWrapper>> eviction = 
fetcher.getMessageCache().policy().eviction();
                 eviction.ifPresent(resultEviction -> 
measurement.record(resultEviction.weightedSize().orElse(0), 
newAttributesBuilder().build()));
             });
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index aad42de98d..5e3d8c5624 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -295,7 +295,7 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
             return future;
         }
         if (position + length > commitPosition) {
-            logger.warn("TieredFileSegment#readAsync request position + length 
is greater than commit position," +
+            logger.debug("TieredFileSegment#readAsync request position + 
length is greater than commit position," +
                     " correct length using commit position, file: {}, request 
position: {}, commit position:{}, change length from {} to {}",
                 getPath(), position, commitPosition, length, commitPosition - 
position);
             length = (int) (commitPosition - position);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
index 6db45a7479..2c4a6e5784 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java
@@ -20,11 +20,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
 import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
 import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
 
@@ -113,53 +113,72 @@ public class MessageBufferUtil {
         return MessageDecoder.decodeProperties(slice);
     }
 
-    public static List<Pair<Integer/* offset of msgBuffer */, Integer/* msg 
size */>> splitMessageBuffer(
-        ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
+    public static List<SelectBufferResult> splitMessageBuffer(ByteBuffer 
cqBuffer, ByteBuffer msgBuffer) {
+
         cqBuffer.rewind();
         msgBuffer.rewind();
-        List<Pair<Integer, Integer>> messageList = new 
ArrayList<>(cqBuffer.remaining() / 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+
+        List<SelectBufferResult> bufferResultList = new ArrayList<>(
+            cqBuffer.remaining() / 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+
+        if (msgBuffer.remaining() == 0) {
+            logger.error("MessageBufferUtil#splitMessage, msg buffer length is 
zero");
+            return bufferResultList;
+        }
+
         if (cqBuffer.remaining() % 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
-            logger.warn("MessageBufferUtil#splitMessage: consume queue buffer 
size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}",
-                cqBuffer.remaining(), 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
-            return messageList;
+            logger.error("MessageBufferUtil#splitMessage, consume queue buffer 
size incorrect, {}", cqBuffer.remaining());
+            return bufferResultList;
         }
+
         try {
-            long startCommitLogOffset = 
CQItemBufferUtil.getCommitLogOffset(cqBuffer);
-            for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
-                cqBuffer.position(pos);
-                int diff = (int) 
(CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset);
-                int size = CQItemBufferUtil.getSize(cqBuffer);
-                if (diff + size > msgBuffer.limit()) {
-                    logger.error("MessageBufferUtil#splitMessage: message 
buffer size is incorrect: record in consume queue: {}, actual: {}", diff + 
size, msgBuffer.remaining());
-                    return messageList;
+            long firstCommitLogOffset = 
CQItemBufferUtil.getCommitLogOffset(cqBuffer);
+
+            for (int position = cqBuffer.position(); position < 
cqBuffer.limit();
+                position += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
+
+                cqBuffer.position(position);
+                long logOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
+                int bufferSize = CQItemBufferUtil.getSize(cqBuffer);
+                long tagCode = CQItemBufferUtil.getTagCode(cqBuffer);
+
+                int offset = (int) (logOffset - firstCommitLogOffset);
+                if (offset + bufferSize > msgBuffer.limit()) {
+                    logger.error("MessageBufferUtil#splitMessage, message 
buffer size incorrect. " +
+                        "Expect length in consume queue: {}, actual length: 
{}", offset + bufferSize, msgBuffer.limit());
+                    break;
                 }
-                msgBuffer.position(diff);
 
+                msgBuffer.position(offset);
                 int magicCode = getMagicCode(msgBuffer);
                 if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) {
-                    logger.warn("MessageBufferUtil#splitMessage: message 
decode error: blank magic code, this message may be coda, try to fix offset");
-                    diff = diff + TieredCommitLog.CODA_SIZE;
-                    msgBuffer.position(diff);
+                    offset += TieredCommitLog.CODA_SIZE;
+                    msgBuffer.position(offset);
                     magicCode = getMagicCode(msgBuffer);
                 }
-                if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && 
magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
-                    logger.warn("MessageBufferUtil#splitMessage: message 
decode error: unknown magic code");
+                if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE &&
+                    magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
+                    logger.warn("MessageBufferUtil#splitMessage, found unknown 
magic code. " +
+                        "Message offset: {}, wrong magic code: {}", offset, 
magicCode);
                     continue;
                 }
 
-                if (getTotalSize(msgBuffer) != size) {
-                    logger.warn("MessageBufferUtil#splitMessage: message size 
is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer));
+                if (bufferSize != getTotalSize(msgBuffer)) {
+                    logger.warn("MessageBufferUtil#splitMessage, message 
length in commitlog incorrect. " +
+                        "Except length in commitlog: {}, actual: {}", 
getTotalSize(msgBuffer), bufferSize);
                     continue;
                 }
 
-                messageList.add(Pair.of(diff, size));
+                ByteBuffer sliceBuffer = msgBuffer.slice();
+                sliceBuffer.limit(bufferSize);
+                bufferResultList.add(new SelectBufferResult(sliceBuffer, 
offset, bufferSize, tagCode));
             }
         } catch (Exception e) {
-            logger.error("MessageBufferUtil#splitMessage: split message 
failed, maybe decode consume queue item failed", e);
+            logger.error("MessageBufferUtil#splitMessage, split message buffer 
error", e);
         } finally {
             cqBuffer.rewind();
             msgBuffer.rewind();
         }
-        return messageList;
+        return bufferResultList;
     }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 4e0d7e6979..4e8287533f 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
@@ -143,17 +143,18 @@ public class TieredMessageFetcherTest {
 
         fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, new 
ArrayList<>());
         Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize());
-        fetcher.putMessageToCache(flatFile, 0, new SelectMappedBufferResult(0, 
msg1, msg1.remaining(), null), 0, 0, 1);
+        SelectMappedBufferResult bufferResult = new 
SelectMappedBufferResult(0, msg1, msg1.remaining(), null);
+        fetcher.putMessageToCache(flatFile, new 
SelectBufferResultWrapper(bufferResult, 0, 0, false));
         Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
 
-        GetMessageResult getMessageResult = 
fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32).join();
+        GetMessageResult getMessageResult = 
fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32, true).join();
         Assert.assertEquals(GetMessageStatus.FOUND, 
getMessageResult.getStatus());
         Assert.assertEquals(1, getMessageResult.getMessageBufferList().size());
         Assert.assertEquals(msg1, 
getMessageResult.getMessageBufferList().get(0));
 
         Awaitility.waitAtMost(3, TimeUnit.SECONDS)
             .until(() -> fetcher.getMessageCache().estimatedSize() == 2);
-        ArrayList<SelectMappedBufferResultWrapper> wrapperList = new 
ArrayList<>();
+        ArrayList<SelectBufferResultWrapper> wrapperList = new ArrayList<>();
         wrapperList.add(fetcher.getMessageFromCache(flatFile, 0));
         fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, 
wrapperList);
         Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
new file mode 100644
index 0000000000..deb8770d28
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExtTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GetMessageResultExtTest {
+
+    @Test
+    public void doFilterTest() {
+        GetMessageResultExt resultExt = new GetMessageResultExt();
+        Assert.assertEquals(0, 
resultExt.doFilterMessage(null).getMessageCount());
+        resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
+        Assert.assertEquals(0, 
resultExt.doFilterMessage(null).getMessageCount());
+        resultExt.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
+        Assert.assertEquals(0, 
resultExt.doFilterMessage(null).getMessageCount());
+
+        resultExt.addMessageExt(new SelectMappedBufferResult(
+                1000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, 
null),
+            0, "TagA".hashCode());
+        resultExt.addMessageExt(new SelectMappedBufferResult(
+                2000L, MessageBufferUtilTest.buildMockedMessageBuffer(), 100, 
null),
+            0, "TagB".hashCode());
+        assertEquals(2, resultExt.getMessageCount());
+
+        resultExt.setStatus(GetMessageStatus.FOUND);
+        GetMessageResult getMessageResult = resultExt.doFilterMessage(new 
MessageFilter() {
+            @Override
+            public boolean isMatchedByConsumeQueue(Long tagsCode, 
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                return false;
+            }
+
+            @Override
+            public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, 
Map<String, String> properties) {
+                return false;
+            }
+        });
+        Assert.assertEquals(0, getMessageResult.getMessageCount());
+    }
+}
\ No newline at end of file
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
new file mode 100644
index 0000000000..b7e6e639f0
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SelectBufferResultTest {
+    @Test
+    public void testSelectBufferResult() {
+        ByteBuffer buffer = ByteBuffer.allocate(10);
+        long startOffset = 5L;
+        int size = 10;
+        long tagCode = 1L;
+
+        SelectBufferResult result = new SelectBufferResult(buffer, 
startOffset, size, tagCode);
+        Assert.assertEquals(buffer, result.getByteBuffer());
+        Assert.assertEquals(startOffset, result.getStartOffset());
+        Assert.assertEquals(size, result.getSize());
+        Assert.assertEquals(tagCode, result.getTagCode());
+    }
+}
\ No newline at end of file
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 68277cacc5..a0b4389481 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -22,9 +22,9 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
 import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
 import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
 import org.junit.Assert;
@@ -206,10 +206,12 @@ public class MessageBufferUtilTest {
         cqBuffer.flip();
         cqBuffer1.rewind();
         cqBuffer2.rewind();
-        List<Pair<Integer, Integer>> msgList = 
MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+        List<SelectBufferResult> msgList = 
MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
         Assert.assertEquals(2, msgList.size());
-        Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
-        Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, 
MSG_LEN), msgList.get(1));
+        Assert.assertEquals(0, msgList.get(0).getStartOffset());
+        Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
+        Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, 
msgList.get(1).getStartOffset());
+        Assert.assertEquals(MSG_LEN, msgList.get(1).getSize());
 
         cqBuffer = 
ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
         cqBuffer.put(cqBuffer1);
@@ -219,7 +221,8 @@ public class MessageBufferUtilTest {
         cqBuffer4.rewind();
         msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
         Assert.assertEquals(1, msgList.size());
-        Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+        Assert.assertEquals(0, msgList.get(0).getStartOffset());
+        Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
 
         cqBuffer = 
ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3);
         cqBuffer.put(cqBuffer1);
@@ -227,8 +230,10 @@ public class MessageBufferUtilTest {
         cqBuffer.flip();
         msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
         Assert.assertEquals(2, msgList.size());
-        Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
-        Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, 
MSG_LEN), msgList.get(1));
+        Assert.assertEquals(0, msgList.get(0).getStartOffset());
+        Assert.assertEquals(MSG_LEN, msgList.get(0).getSize());
+        Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, 
msgList.get(1).getStartOffset());
+        Assert.assertEquals(MSG_LEN, msgList.get(1).getSize());
 
         cqBuffer = 
ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
         cqBuffer.put(cqBuffer5);

Reply via email to