This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 9bb73b9a38 [#ISSUE 7222] Bug fix and refactoring of the Indexfile in tiered storage (#7224) 9bb73b9a38 is described below commit 9bb73b9a38548b99ac5126c40380c3c2e7fc586e Author: lizhimins <707364...@qq.com> AuthorDate: Wed Aug 23 09:46:27 2023 +0800 [#ISSUE 7222] Bug fix and refactoring of the Indexfile in tiered storage (#7224) --- .../rocketmq/tieredstore/file/TieredIndexFile.java | 38 ++++++++-- .../tieredstore/file/TieredIndexFileTest.java | 84 +++++----------------- 2 files changed, 52 insertions(+), 70 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java index 50beb01ae4..eda5e01065 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.tieredstore.file; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -99,7 +100,7 @@ public class TieredIndexFile { this::doScheduleTask, 10, 10, TimeUnit.SECONDS); } - private void doScheduleTask() { + protected void doScheduleTask() { try { curFileLock.lock(); try { @@ -145,6 +146,11 @@ public class TieredIndexFile { } } + @VisibleForTesting + public MappedFile getPreMappedFile() { + return preMappedFile; + } + private void initFile() throws IOException { curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); initIndexFileHeader(curMappedFile); @@ -156,19 +162,26 @@ public class TieredIndexFile { if (isFileSealed(curMappedFile)) { if (preFileExists) { - preFile.delete(); + if (preFile.delete()) { + logger.info("Pre IndexFile deleted success", preFilepath); + } else { + logger.error("Pre IndexFile deleted failed", preFilepath); + } } boolean rename = curMappedFile.renameTo(preFilepath); if (rename) { preMappedFile = curMappedFile; curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize); + initIndexFileHeader(curMappedFile); preFileExists = true; } } + if (preFileExists) { synchronized (TieredIndexFile.class) { if (inflightCompactFuture.isDone()) { - inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit(new CompactTask(storeConfig, preMappedFile, flatFile), null); + inflightCompactFuture = TieredStoreExecutor.compactIndexFileExecutor.submit( + new CompactTask(storeConfig, preMappedFile, flatFile), null); } } } @@ -261,7 +274,8 @@ public class TieredIndexFile { } } - public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String topic, String key, long beginTime, long endTime) { + public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String topic, String key, long beginTime, + long endTime) { int hashCode = indexKeyHashMethod(buildKey(topic, key)); int slotPosition = hashCode % maxHashSlotNum; List<TieredFileSegment> fileSegmentList = flatFile.getFileListByTime(beginTime, endTime); @@ -355,7 +369,7 @@ public class TieredIndexFile { private final int fileMaxSize; private MappedFile originFile; private TieredFlatFile fileQueue; - private final MappedFile compactFile; + private MappedFile compactFile; public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile, TieredFlatFile fileQueue) throws IOException { @@ -381,6 +395,17 @@ public class TieredIndexFile { } catch (Throwable throwable) { logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable); } + + try { + if (originFile != null) { + originFile.destroy(-1); + } + if (compactFile != null) { + compactFile.destroy(-1); + } + } catch (Throwable throwable) { + logger.error("TieredIndexFile#compactTask: destroy index file failed:", throwable); + } } public void compact() { @@ -396,6 +421,8 @@ public class TieredIndexFile { fileQueue.commit(true); compactFile.destroy(-1); originFile.destroy(-1); + compactFile = null; + originFile = null; } private void buildCompactFile() { @@ -414,6 +441,7 @@ public class TieredIndexFile { if (slotValue != -1) { int indexTotalSize = 0; int indexPosition = slotValue; + while (indexPosition >= 0 && indexPosition < maxIndexNum) { int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java index 7ef49578dd..262d6645b3 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java @@ -19,9 +19,8 @@ package org.apache.rocketmq.tieredstore.file; import com.sun.jna.Platform; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; @@ -31,9 +30,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class TieredIndexFileTest { @@ -45,11 +42,12 @@ public class TieredIndexFileTest { @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); + storeConfig.setBrokerName("IndexFileBroker"); storeConfig.setStorePathRootDir(storePath); - storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment"); - storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2); - storeConfig.setTieredStoreIndexFileMaxIndexNum(3); - mq = new MessageQueue("TieredIndexFileTest", storeConfig.getBrokerName(), 1); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); + storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5); + storeConfig.setTieredStoreIndexFileMaxIndexNum(20); + mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1); TieredStoreUtil.getMetadataStore(storeConfig); TieredStoreExecutor.init(); } @@ -61,77 +59,33 @@ public class TieredIndexFileTest { TieredStoreExecutor.shutdown(); } - @Ignore @Test public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException { if (Platform.isWindows()) { return; } - // skip this test on windows - Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS); - TieredFileAllocator fileQueueFactory = new TieredFileAllocator(storeConfig); TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, storePath); + indexFile.append(mq, 0, "key3", 3, 300, 1000); indexFile.append(mq, 0, "key2", 2, 200, 1100); indexFile.append(mq, 0, "key1", 1, 100, 1200); - Awaitility.waitAtMost(5, TimeUnit.SECONDS) - .until(() -> { - List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); - if (indexList.size() != 1) { - return false; - } - - ByteBuffer indexBuffer = indexList.get(0).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2, indexBuffer.remaining()); - - Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); - - Assert.assertEquals(3, indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4)); - Assert.assertEquals(300, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8)); - Assert.assertEquals(0, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8 + 4)); - return true; - }); - - indexFile.append(mq, 0, "key4", 4, 400, 1300); - indexFile.append(mq, 0, "key4", 4, 400, 1300); - indexFile.append(mq, 0, "key4", 4, 400, 1300); - - Awaitility.waitAtMost(5, TimeUnit.SECONDS) - .until(() -> { - List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join(); - if (indexList.size() != 1) { - return false; - } - - ByteBuffer indexBuffer = indexList.get(0).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining()); - Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); - return true; - }); - - List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join(); + // do not do schedule task here + TieredStoreExecutor.shutdown(); + List<Pair<Long, ByteBuffer>> indexList = + indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); Assert.assertEquals(0, indexList.size()); - indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200, 1300).join(); - Assert.assertEquals(2, indexList.size()); - - ByteBuffer indexBuffer = indexList.get(0).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining()); - Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); + // do compaction once + TieredStoreExecutor.init(); + storeConfig.setTieredStoreIndexFileRollingIdleInterval(0); + indexFile.doScheduleTask(); + Awaitility.await().atMost(Duration.ofSeconds(10)) + .until(() -> !indexFile.getPreMappedFile().getFile().exists()); - indexBuffer = indexList.get(1).getValue(); - Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE, indexBuffer.remaining()); - Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4)); - Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8)); - Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4)); + indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); + Assert.assertEquals(1, indexList.size()); } }