This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c66e758a1c [ISSUE #9379] Fix timeStoreTable delete logic in 
IndexService (#9384)
c66e758a1c is described below

commit c66e758a1cb35f6af9f81c0b919aa156a44fc3df
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue May 6 15:31:03 2025 +0800

    [ISSUE #9379] Fix timeStoreTable delete logic in IndexService (#9384)
    
    * [ISSUE #9379] Fix timeStoreTable delete logic in IndexService
    
    * [ISSUE #9379] Fix delete logic from TimeStoreTable in IndexService
---
 .../rocketmq/tieredstore/file/FlatAppendFile.java  |  2 +-
 .../tieredstore/index/IndexStoreService.java       | 21 ++++++++-----
 .../tieredstore/index/IndexStoreServiceTest.java   | 36 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 9 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index 377341d950..38e451d3ff 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -253,7 +253,7 @@ public class FlatAppendFile {
                 FileSegment fileSegment = fileSegmentTable.get(0);
 
                 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
-                    fileSegment.getMaxTimestamp() > expireTimestamp) {
+                    fileSegment.getMaxTimestamp() >= expireTimestamp) {
                     log.debug("FileSegment has not expired, filePath={}, 
fileType={}, " +
                             "offset={}, expireTimestamp={}, maxTimestamp={}", 
filePath, fileType,
                         fileSegment.getBaseOffset(), expireTimestamp, 
fileSegment.getMaxTimestamp());
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 7fe645da0f..f4f602a105 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -254,12 +254,12 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                 .whenComplete((v, t) -> {
                     // Try to return the query results as much as possible here
                     // rather than directly throwing exceptions
-                    if (result.isEmpty() && t != null) {
-                        future.completeExceptionally(t);
-                    } else {
-                        List<IndexItem> resultList = new 
ArrayList<>(result.values());
-                        future.complete(resultList.subList(0, 
Math.min(resultList.size(), maxCount)));
+                    if (t != null) {
+                        log.error("IndexStoreService#queryAsync, topicId={}, 
key={}, maxCount={}, timestamp={}-{}",
+                            topic, key, maxCount, beginTime, endTime, t);
                     }
+                    List<IndexItem> resultList = new 
ArrayList<>(result.values());
+                    future.complete(resultList.subList(0, 
Math.min(resultList.size(), maxCount)));
                 });
         } catch (Exception e) {
             log.error("IndexStoreService#queryAsync, topicId={}, key={}, 
maxCount={}, timestamp={}-{}",
@@ -344,10 +344,15 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
         // delete file in time store table
         readWriteLock.writeLock().lock();
         try {
-            timeStoreTable.entrySet().removeIf(entry ->
-                entry.getKey() < expireTimestamp &&
-                    
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()));
             flatAppendFile.destroyExpiredFile(expireTimestamp);
+            timeStoreTable.entrySet().removeIf(entry ->
+                
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) &&
+                    entry.getKey() < flatAppendFile.getMinTimestamp());
+            int tableSize = (int) timeStoreTable.entrySet().stream()
+                .filter(entry -> 
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
+                .count();
+            log.info("IndexStoreService delete file, timestamp={}, remote={}, 
table={}, all={}",
+                expireTimestamp, flatAppendFile.getFileSegmentList().size(), 
tableSize, timeStoreTable.size());
         } finally {
             readWriteLock.writeLock().unlock();
         }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
index 83b407e73b..7b881ddd44 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -33,15 +33,18 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
 import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
 import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
 import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
 import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
 import org.apache.rocketmq.tieredstore.util.MessageStoreUtilTest;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -206,6 +209,39 @@ public class IndexStoreServiceTest {
         });
     }
 
+    @Test
+    public void deleteFileTest() throws InterruptedException, 
IllegalAccessException {
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        indexService.start();
+
+        for (int i = 0; i < 2 * 20; i++) {
+            AppendResult result = indexService.putKey(
+                TOPIC_NAME, TOPIC_ID, QUEUE_ID, 
Collections.singleton(String.valueOf(i)),
+                i * 100L, MESSAGE_SIZE, System.currentTimeMillis());
+            Assert.assertEquals(AppendResult.SUCCESS, result);
+            TimeUnit.MILLISECONDS.sleep(1);
+        }
+
+        indexService.wakeup();
+        Awaitility.await().until(() -> {
+            int tableSize = (int) 
indexService.getTimeStoreTable().entrySet().stream()
+                .filter(entry -> 
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
+                .count();
+            return tableSize == 2;
+        });
+
+        long timestamp = 
indexService.getTimeStoreTable().firstEntry().getValue().getEndTimestamp();
+        FlatAppendFile flatAppendFile = (FlatAppendFile)
+            FieldUtils.readField(indexService, "flatAppendFile", true);
+
+        indexService.destroyExpiredFile(timestamp);
+        Assert.assertEquals(2, flatAppendFile.getFileSegmentList().size());
+        Assert.assertEquals(3, indexService.getTimeStoreTable().size());
+        indexService.destroyExpiredFile(timestamp + 1);
+        Assert.assertEquals(1, flatAppendFile.getFileSegmentList().size());
+        Assert.assertEquals(2, indexService.getTimeStoreTable().size());
+    }
+
     @Test
     public void restartServiceTest() throws InterruptedException {
         indexService = new IndexStoreService(fileAllocator, filePath);

Reply via email to