This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new c66e758a1c [ISSUE #9379] Fix timeStoreTable delete logic in IndexService (#9384) c66e758a1c is described below commit c66e758a1cb35f6af9f81c0b919aa156a44fc3df Author: lizhimins <707364...@qq.com> AuthorDate: Tue May 6 15:31:03 2025 +0800 [ISSUE #9379] Fix timeStoreTable delete logic in IndexService (#9384) * [ISSUE #9379] Fix timeStoreTable delete logic in IndexService * [ISSUE #9379] Fix delete logic from TimeStoreTable in IndexService --- .../rocketmq/tieredstore/file/FlatAppendFile.java | 2 +- .../tieredstore/index/IndexStoreService.java | 21 ++++++++----- .../tieredstore/index/IndexStoreServiceTest.java | 36 ++++++++++++++++++++++ 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java index 377341d950..38e451d3ff 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java @@ -253,7 +253,7 @@ public class FlatAppendFile { FileSegment fileSegment = fileSegmentTable.get(0); if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE && - fileSegment.getMaxTimestamp() > expireTimestamp) { + fileSegment.getMaxTimestamp() >= expireTimestamp) { log.debug("FileSegment has not expired, filePath={}, fileType={}, " + "offset={}, expireTimestamp={}, maxTimestamp={}", filePath, fileType, fileSegment.getBaseOffset(), expireTimestamp, fileSegment.getMaxTimestamp()); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 7fe645da0f..f4f602a105 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -254,12 +254,12 @@ public class IndexStoreService extends ServiceThread implements IndexService { .whenComplete((v, t) -> { // Try to return the query results as much as possible here // rather than directly throwing exceptions - if (result.isEmpty() && t != null) { - future.completeExceptionally(t); - } else { - List<IndexItem> resultList = new ArrayList<>(result.values()); - future.complete(resultList.subList(0, Math.min(resultList.size(), maxCount))); + if (t != null) { + log.error("IndexStoreService#queryAsync, topicId={}, key={}, maxCount={}, timestamp={}-{}", + topic, key, maxCount, beginTime, endTime, t); } + List<IndexItem> resultList = new ArrayList<>(result.values()); + future.complete(resultList.subList(0, Math.min(resultList.size(), maxCount))); }); } catch (Exception e) { log.error("IndexStoreService#queryAsync, topicId={}, key={}, maxCount={}, timestamp={}-{}", @@ -344,10 +344,15 @@ public class IndexStoreService extends ServiceThread implements IndexService { // delete file in time store table readWriteLock.writeLock().lock(); try { - timeStoreTable.entrySet().removeIf(entry -> - entry.getKey() < expireTimestamp && - IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus())); flatAppendFile.destroyExpiredFile(expireTimestamp); + timeStoreTable.entrySet().removeIf(entry -> + IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) && + entry.getKey() < flatAppendFile.getMinTimestamp()); + int tableSize = (int) timeStoreTable.entrySet().stream() + .filter(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus())) + .count(); + log.info("IndexStoreService delete file, timestamp={}, remote={}, table={}, all={}", + expireTimestamp, flatAppendFile.getFileSegmentList().size(), tableSize, timeStoreTable.size()); } finally { readWriteLock.writeLock().unlock(); } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java index 83b407e73b..7b881ddd44 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java @@ -33,15 +33,18 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.tieredstore.MessageStoreConfig; import org.apache.rocketmq.tieredstore.common.AppendResult; +import org.apache.rocketmq.tieredstore.file.FlatAppendFile; import org.apache.rocketmq.tieredstore.file.FlatFileFactory; import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore; import org.apache.rocketmq.tieredstore.metadata.MetadataStore; import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; import org.apache.rocketmq.tieredstore.util.MessageStoreUtilTest; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -206,6 +209,39 @@ public class IndexStoreServiceTest { }); } + @Test + public void deleteFileTest() throws InterruptedException, IllegalAccessException { + indexService = new IndexStoreService(fileAllocator, filePath); + indexService.start(); + + for (int i = 0; i < 2 * 20; i++) { + AppendResult result = indexService.putKey( + TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)), + i * 100L, MESSAGE_SIZE, System.currentTimeMillis()); + Assert.assertEquals(AppendResult.SUCCESS, result); + TimeUnit.MILLISECONDS.sleep(1); + } + + indexService.wakeup(); + Awaitility.await().until(() -> { + int tableSize = (int) indexService.getTimeStoreTable().entrySet().stream() + .filter(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus())) + .count(); + return tableSize == 2; + }); + + long timestamp = indexService.getTimeStoreTable().firstEntry().getValue().getEndTimestamp(); + FlatAppendFile flatAppendFile = (FlatAppendFile) + FieldUtils.readField(indexService, "flatAppendFile", true); + + indexService.destroyExpiredFile(timestamp); + Assert.assertEquals(2, flatAppendFile.getFileSegmentList().size()); + Assert.assertEquals(3, indexService.getTimeStoreTable().size()); + indexService.destroyExpiredFile(timestamp + 1); + Assert.assertEquals(1, flatAppendFile.getFileSegmentList().size()); + Assert.assertEquals(2, indexService.getTimeStoreTable().size()); + } + @Test public void restartServiceTest() throws InterruptedException { indexService = new IndexStoreService(fileAllocator, filePath);