This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 729275c0c7 [ISSUE #9271] Enhance tiered storage getQueueOffsetByTimeAsync (#9272) 729275c0c7 is described below commit 729275c0c775a722d2f04fea4b81e251c18bb323 Author: bxfjb <48467309+bx...@users.noreply.github.com> AuthorDate: Thu Mar 27 13:57:47 2025 +0800 [ISSUE #9271] Enhance tiered storage getQueueOffsetByTimeAsync (#9272) --- .../rocketmq/tieredstore/file/FlatMessageFile.java | 22 ++++++++++++-- .../tieredstore/file/FlatMessageFileTest.java | 34 ++++++++++++++++++---- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java index ade37149d6..2519f91ebe 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.tieredstore.common.AppendResult; import org.apache.rocketmq.tieredstore.metadata.MetadataStore; import org.apache.rocketmq.tieredstore.metadata.entity.QueueMetadata; import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata; +import org.apache.rocketmq.tieredstore.provider.FileSegment; import org.apache.rocketmq.tieredstore.util.MessageFormatUtil; import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; import org.slf4j.Logger; @@ -302,9 +303,26 @@ public class FlatMessageFile implements FlatFileInterface { return CompletableFuture.completedFuture(cqMin); } + // get correct consume queue file by binary search + List<FileSegment> consumeQueueFileList = this.consumeQueue.getFileSegmentList(); + int low = 0, high = consumeQueueFileList.size() - 1; + int mid = low + (high - low) / 2; + while (low <= high) { + FileSegment fileSegment = consumeQueueFileList.get(mid); + if (fileSegment.getMinTimestamp() <= timestamp && timestamp <= fileSegment.getMaxTimestamp()) { + break; + } else if (timestamp < fileSegment.getMinTimestamp()) { + high = mid - 1; + } else { + low = mid + 1; + } + mid = low + (high - low) / 2; + } + FileSegment target = consumeQueueFileList.get(mid); + // binary search lower bound index in a sorted array - long minOffset = cqMin; - long maxOffset = cqMax; + long minOffset = target.getBaseOffset() / MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE; + long maxOffset = target.getCommitOffset() / MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE - 1; List<String> queryLog = new ArrayList<>(); while (minOffset < maxOffset) { long middle = minOffset + (maxOffset - minOffset) / 2; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java index 8208d27741..97768d0658 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java @@ -177,13 +177,35 @@ public class FlatMessageFileTest { // append message to consume queue flatFile.consumeQueue.initOffset(50 * ConsumeQueue.CQ_STORE_UNIT_SIZE); - for (int i = 0; i < 5; i++) { - AppendResult appendResult = flatFile.appendConsumeQueue(new DispatchRequest( - mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * i, - MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50 + i, + AppendResult appendResult = flatFile.appendConsumeQueue(new DispatchRequest( + mq.getTopic(), mq.getQueueId(), 0, + MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50, "", "", 0, 0, null)); - Assert.assertEquals(AppendResult.SUCCESS, appendResult); - } + Assert.assertEquals(AppendResult.SUCCESS, appendResult); + + appendResult = flatFile.appendConsumeQueue(new DispatchRequest( + mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN, + MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 51, + "", "", 0, 0, null)); + Assert.assertEquals(AppendResult.SUCCESS, appendResult); + + appendResult = flatFile.appendConsumeQueue(new DispatchRequest( + mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * 2, + MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 52, + "", "", 0, 0, null)); + Assert.assertEquals(AppendResult.SUCCESS, appendResult); + + appendResult = flatFile.appendConsumeQueue(new DispatchRequest( + mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * 3, + MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 53, + "", "", 0, 0, null)); + Assert.assertEquals(AppendResult.SUCCESS, appendResult); + + appendResult = flatFile.appendConsumeQueue(new DispatchRequest( + mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN * 4, + MessageFormatUtilTest.MSG_LEN, 0, timestamp3, 54, + "", "", 0, 0, null)); + Assert.assertEquals(AppendResult.SUCCESS, appendResult); // commit message will increase max consume queue offset Assert.assertTrue(flatFile.commitAsync().join());