This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 53fdc4ad3d [ISSUE #9213] Fix get the earliest time error when data is 
clean up in tiered storage (#9214)
53fdc4ad3d is described below

commit 53fdc4ad3d23a1574aed8112fc2d3f34b7e66ad9
Author: lizhimins <707364...@qq.com>
AuthorDate: Mon Mar 3 09:58:06 2025 +0800

    [ISSUE #9213] Fix get the earliest time error when data is clean up in 
tiered storage (#9214)
    
    * [ISSUE #9213] Fix get the earliest time error when data is clean up in 
tiered storag
---
 .../rocketmq/tieredstore/TieredMessageStore.java   | 26 ++++++++++------------
 .../tieredstore/core/MessageStoreFetcherImpl.java  |  9 +-------
 .../rocketmq/tieredstore/file/FlatMessageFile.java | 12 ++++++----
 .../tieredstore/TieredMessageStoreTest.java        |  2 +-
 4 files changed, 22 insertions(+), 27 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 0e3ede871c..f1c935d00b 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
 public class TieredMessageStore extends AbstractPluginMessageStore {
 
     protected static final Logger log = 
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
+    protected static final long MIN_STORE_TIME = -1L;
 
     protected final String brokerName;
     protected final MessageStore defaultStore;
@@ -310,24 +311,21 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         return getEarliestMessageTimeAsync(topic, queueId).join();
     }
 
+    /**
+     * In the original design, getting the earliest time of the first message
+     * would generate two RPC requests. However, using the timestamp stored in 
the metadata
+     * avoids these requests, although this approach might introduce some 
level of inaccuracy.
+     */
     @Override
     public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, 
int queueId) {
-        long nextEarliestMessageTime = next.getEarliestMessageTime(topic, 
queueId);
-        long finalNextEarliestMessageTime = nextEarliestMessageTime > 0 ? 
nextEarliestMessageTime : Long.MAX_VALUE;
-        Stopwatch stopwatch = Stopwatch.createStarted();
+        long localMinTime = next.getEarliestMessageTime(topic, queueId);
         return fetcher.getEarliestMessageTimeAsync(topic, queueId)
-            .thenApply(time -> {
-                Attributes latencyAttributes = 
TieredStoreMetricsManager.newAttributesBuilder()
-                    .put(TieredStoreMetricsConstant.LABEL_OPERATION, 
TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME)
-                    .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
-                    .build();
-                
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
 latencyAttributes);
-                if (time < 0) {
-                    log.debug("GetEarliestMessageTimeAsync failed, try to get 
earliest message time from next store: topic: {}, queue: {}",
-                        topic, queueId);
-                    return finalNextEarliestMessageTime != Long.MAX_VALUE ? 
finalNextEarliestMessageTime : -1;
+            .thenApply(remoteMinTime -> {
+                if (localMinTime > MIN_STORE_TIME && remoteMinTime > 
MIN_STORE_TIME) {
+                    return Math.min(localMinTime, remoteMinTime);
                 }
-                return Math.min(finalNextEarliestMessageTime, time);
+                return localMinTime > MIN_STORE_TIME ? localMinTime :
+                    (remoteMinTime > MIN_STORE_TIME ? remoteMinTime : 
MIN_STORE_TIME);
             });
     }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index bc347bd5b4..9e5ab01d3b 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -375,14 +375,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
     @Override
     public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, 
int queueId) {
         FlatMessageFile flatFile = flatFileStore.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
-        if (flatFile == null) {
-            return CompletableFuture.completedFuture(-1L);
-        }
-
-        // read from timestamp to timestamp + length
-        int length = MessageFormatUtil.STORE_TIMESTAMP_POSITION + 8;
-        return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), 
length)
-            .thenApply(MessageFormatUtil::getStoreTimeStamp);
+        return CompletableFuture.completedFuture(flatFile == null ? -1L : 
flatFile.getMinStoreTimestamp());
     }
 
     @Override
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 4510a8a127..ade37149d6 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -178,16 +178,20 @@ public class FlatMessageFile implements FlatFileInterface 
{
         return consumeQueue.append(buffer, request.getStoreTimestamp());
     }
 
-
-
     @Override
     public void release() {
-
     }
 
     @Override
     public long getMinStoreTimestamp() {
-        return commitLog.getMinTimestamp();
+        long minStoreTime = -1L;
+        if (Long.MAX_VALUE != commitLog.getMinTimestamp()) {
+            minStoreTime = Math.max(minStoreTime, commitLog.getMinTimestamp());
+        }
+        if (Long.MAX_VALUE != consumeQueue.getMinTimestamp()) {
+            minStoreTime = Math.max(minStoreTime, 
consumeQueue.getMinTimestamp());
+        }
+        return minStoreTime;
     }
 
     @Override
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 2f39558482..bb259ae811 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -246,7 +246,7 @@ public class TieredMessageStoreTest {
     @Test
     public void testGetEarliestMessageTimeAsync() {
         when(fetcher.getEarliestMessageTimeAsync(anyString(), 
anyInt())).thenReturn(CompletableFuture.completedFuture(1L));
-        Assert.assertEquals(1, (long) 
currentStore.getEarliestMessageTimeAsync(mq.getTopic(), 
mq.getQueueId()).join());
+        Assert.assertEquals(0, (long) 
currentStore.getEarliestMessageTimeAsync(mq.getTopic(), 
mq.getQueueId()).join());
 
         when(fetcher.getEarliestMessageTimeAsync(anyString(), 
anyInt())).thenReturn(CompletableFuture.completedFuture(-1L));
         when(defaultStore.getEarliestMessageTime(anyString(), 
anyInt())).thenReturn(2L);

Reply via email to