This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6819b4d684 [ISSUE #9233] Fix query time boundary calculation in tiered
storage (#9374)
6819b4d684 is described below
commit 6819b4d684785e7192cd1e86397eda88950443c9
Author: dingshuangxi888 <[email protected]>
AuthorDate: Tue Apr 29 19:41:05 2025 +0800
[ISSUE #9233] Fix query time boundary calculation in tiered storage (#9374)
---
.../apache/rocketmq/tieredstore/index/IndexStoreService.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 75c61dcb38..7fe645da0f 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -226,13 +226,15 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
public CompletableFuture<List<IndexItem>> queryAsync(
String topic, String key, int maxCount, long beginTime, long endTime) {
+ if (beginTime > endTime) {
+ return CompletableFuture.completedFuture(new ArrayList<>());
+ }
+
CompletableFuture<List<IndexItem>> future = new CompletableFuture<>();
try {
readWriteLock.readLock().lock();
- long firstFileTimeStamp = this.timeStoreTable.lowerKey(beginTime)
== null ?
- this.timeStoreTable.firstKey() :
this.timeStoreTable.lowerKey(beginTime);
ConcurrentNavigableMap<Long, IndexFile> pendingMap =
- this.timeStoreTable.subMap(firstFileTimeStamp, true, endTime,
true);
+ this.timeStoreTable.subMap(beginTime, true, endTime, true);
List<CompletableFuture<Void>> futureList = new
ArrayList<>(pendingMap.size());
ConcurrentHashMap<String /* queueId-offset */, IndexItem> result =
new ConcurrentHashMap<>();
@@ -260,6 +262,8 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
}
});
} catch (Exception e) {
+ log.error("IndexStoreService#queryAsync, topicId={}, key={},
maxCount={}, timestamp={}-{}",
+ topic, key, maxCount, beginTime, endTime, e);
future.completeExceptionally(e);
} finally {
readWriteLock.readLock().unlock();