This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new c080e6bbf2 [ISSUE #8438] Fix broker return two messages when query message and index service bug (#8439) c080e6bbf2 is described below commit c080e6bbf208bc91f06ba97beb37241d7d0c20a0 Author: lizhimins <707364...@qq.com> AuthorDate: Thu Jul 25 11:38:31 2024 +0800 [ISSUE #8438] Fix broker return two messages when query message and index service bug (#8439) --- .../main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java | 3 +++ .../java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java | 1 + .../java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java | 4 +++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 99d586ae23..9a25f85a6b 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -460,6 +460,9 @@ public class TieredMessageStore extends AbstractPluginMessageStore { if (flatFileStore != null) { flatFileStore.shutdown(); } + if (indexService != null) { + indexService.shutdown(); + } if (storeExecutor != null) { storeExecutor.shutdown(); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java index b9ba80d08d..0c20a1cfb4 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java @@ -90,6 +90,7 @@ public class FlatAppendFile { public void initOffset(long offset) { if (this.fileSegmentTable.isEmpty()) { FileSegment fileSegment = fileSegmentFactory.createSegment(fileType, filePath, offset); + fileSegment.initPosition(fileSegment.getSize()); this.flushFileSegmentMeta(fileSegment); this.fileSegmentTable.add(fileSegment); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index 180399332e..f9604b43e6 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -287,7 +287,9 @@ public class IndexStoreFile implements IndexFile { buffer.position(this.getItemPosition(slotValue)); buffer.get(bytes); IndexItem indexItem = new IndexItem(bytes); - if (hashCode == indexItem.getHashCode()) { + long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); + if (hashCode == indexItem.getHashCode() && + beginTime <= storeTimestamp && storeTimestamp <= endTime) { result.add(indexItem); if (result.size() > maxCount) { break;