This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6819b4d684 [ISSUE #9233] Fix query time boundary calculation in tiered 
storage (#9374)
6819b4d684 is described below

commit 6819b4d684785e7192cd1e86397eda88950443c9
Author: dingshuangxi888 <[email protected]>
AuthorDate: Tue Apr 29 19:41:05 2025 +0800

    [ISSUE #9233] Fix query time boundary calculation in tiered storage (#9374)
---
 .../apache/rocketmq/tieredstore/index/IndexStoreService.java   | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 75c61dcb38..7fe645da0f 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -226,13 +226,15 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
     public CompletableFuture<List<IndexItem>> queryAsync(
         String topic, String key, int maxCount, long beginTime, long endTime) {
 
+        if (beginTime > endTime) {
+            return CompletableFuture.completedFuture(new ArrayList<>());
+        }
+
         CompletableFuture<List<IndexItem>> future = new CompletableFuture<>();
         try {
             readWriteLock.readLock().lock();
-            long firstFileTimeStamp = this.timeStoreTable.lowerKey(beginTime) 
== null ?
-                this.timeStoreTable.firstKey() : 
this.timeStoreTable.lowerKey(beginTime);
             ConcurrentNavigableMap<Long, IndexFile> pendingMap =
-                this.timeStoreTable.subMap(firstFileTimeStamp, true, endTime, 
true);
+                this.timeStoreTable.subMap(beginTime, true, endTime, true);
             List<CompletableFuture<Void>> futureList = new 
ArrayList<>(pendingMap.size());
             ConcurrentHashMap<String /* queueId-offset */, IndexItem> result = 
new ConcurrentHashMap<>();
 
@@ -260,6 +262,8 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                     }
                 });
         } catch (Exception e) {
+            log.error("IndexStoreService#queryAsync, topicId={}, key={}, 
maxCount={}, timestamp={}-{}",
+                topic, key, maxCount, beginTime, endTime, e);
             future.completeExceptionally(e);
         } finally {
             readWriteLock.readLock().unlock();

Reply via email to