This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 729275c0c7 [ISSUE #9271] Enhance tiered storage 
getQueueOffsetByTimeAsync (#9272)
729275c0c7 is described below

commit 729275c0c775a722d2f04fea4b81e251c18bb323
Author: bxfjb <48467309+bx...@users.noreply.github.com>
AuthorDate: Thu Mar 27 13:57:47 2025 +0800

    [ISSUE #9271] Enhance tiered storage getQueueOffsetByTimeAsync (#9272)
---
 .../rocketmq/tieredstore/file/FlatMessageFile.java | 22 ++++++++++++--
 .../tieredstore/file/FlatMessageFileTest.java      | 34 ++++++++++++++++++----
 2 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index ade37149d6..2519f91ebe 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
 import org.apache.rocketmq.tieredstore.metadata.entity.QueueMetadata;
 import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
+import org.apache.rocketmq.tieredstore.provider.FileSegment;
 import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
 import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
 import org.slf4j.Logger;
@@ -302,9 +303,26 @@ public class FlatMessageFile implements FlatFileInterface {
             return CompletableFuture.completedFuture(cqMin);
         }
 
+        // get correct consume queue file by binary search
+        List<FileSegment> consumeQueueFileList = 
this.consumeQueue.getFileSegmentList();
+        int low = 0, high = consumeQueueFileList.size() - 1;
+        int mid = low + (high - low) / 2;
+        while (low <= high) {
+            FileSegment fileSegment = consumeQueueFileList.get(mid);
+            if (fileSegment.getMinTimestamp() <= timestamp && timestamp <= 
fileSegment.getMaxTimestamp()) {
+                break;
+            } else if (timestamp < fileSegment.getMinTimestamp()) {
+                high = mid - 1;
+            } else {
+                low = mid + 1;
+            }
+            mid = low + (high - low) / 2;
+        }
+        FileSegment target = consumeQueueFileList.get(mid);
+
         // binary search lower bound index in a sorted array
-        long minOffset = cqMin;
-        long maxOffset = cqMax;
+        long minOffset = target.getBaseOffset() / 
MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE;
+        long maxOffset = target.getCommitOffset() / 
MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE - 1;
         List<String> queryLog = new ArrayList<>();
         while (minOffset < maxOffset) {
             long middle = minOffset + (maxOffset - minOffset) / 2;
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 8208d27741..97768d0658 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -177,13 +177,35 @@ public class FlatMessageFileTest {
         // append message to consume queue
         flatFile.consumeQueue.initOffset(50 * ConsumeQueue.CQ_STORE_UNIT_SIZE);
 
-        for (int i = 0; i < 5; i++) {
-            AppendResult appendResult = flatFile.appendConsumeQueue(new 
DispatchRequest(
-                mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN 
* i,
-                MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50 + i,
+        AppendResult appendResult = flatFile.appendConsumeQueue(new 
DispatchRequest(
+                mq.getTopic(), mq.getQueueId(), 0,
+                MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50,
                 "", "", 0, 0, null));
-            Assert.assertEquals(AppendResult.SUCCESS, appendResult);
-        }
+        Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+        appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+                mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN,
+                MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 51,
+                "", "", 0, 0, null));
+        Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+        appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+                mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN 
* 2,
+                MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 52,
+                "", "", 0, 0, null));
+        Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+        appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+                mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN 
* 3,
+                MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 53,
+                "", "", 0, 0, null));
+        Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+        appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+                mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN 
* 4,
+                MessageFormatUtilTest.MSG_LEN, 0, timestamp3, 54,
+                "", "", 0, 0, null));
+        Assert.assertEquals(AppendResult.SUCCESS, appendResult);
 
         // commit message will increase max consume queue offset
         Assert.assertTrue(flatFile.commitAsync().join());

Reply via email to