This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9bb73b9a38 [#ISSUE 7222] Bug fix and refactoring of the Indexfile in 
tiered storage (#7224)
9bb73b9a38 is described below

commit 9bb73b9a38548b99ac5126c40380c3c2e7fc586e
Author: lizhimins <707364...@qq.com>
AuthorDate: Wed Aug 23 09:46:27 2023 +0800

    [#ISSUE 7222] Bug fix and refactoring of the Indexfile in tiered storage 
(#7224)
---
 .../rocketmq/tieredstore/file/TieredIndexFile.java | 38 ++++++++--
 .../tieredstore/file/TieredIndexFileTest.java      | 84 +++++-----------------
 2 files changed, 52 insertions(+), 70 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
index 50beb01ae4..eda5e01065 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.tieredstore.file;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -99,7 +100,7 @@ public class TieredIndexFile {
             this::doScheduleTask, 10, 10, TimeUnit.SECONDS);
     }
 
-    private void doScheduleTask() {
+    protected void doScheduleTask() {
         try {
             curFileLock.lock();
             try {
@@ -145,6 +146,11 @@ public class TieredIndexFile {
         }
     }
 
+    @VisibleForTesting
+    public MappedFile getPreMappedFile() {
+        return preMappedFile;
+    }
+
     private void initFile() throws IOException {
         curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
         initIndexFileHeader(curMappedFile);
@@ -156,19 +162,26 @@ public class TieredIndexFile {
 
         if (isFileSealed(curMappedFile)) {
             if (preFileExists) {
-                preFile.delete();
+                if (preFile.delete()) {
+                    logger.info("Pre IndexFile deleted success", preFilepath);
+                } else {
+                    logger.error("Pre IndexFile deleted failed", preFilepath);
+                }
             }
             boolean rename = curMappedFile.renameTo(preFilepath);
             if (rename) {
                 preMappedFile = curMappedFile;
                 curMappedFile = new DefaultMappedFile(curFilePath, 
fileMaxSize);
+                initIndexFileHeader(curMappedFile);
                 preFileExists = true;
             }
         }
+
         if (preFileExists) {
             synchronized (TieredIndexFile.class) {
                 if (inflightCompactFuture.isDone()) {
-                    inflightCompactFuture = 
TieredStoreExecutor.compactIndexFileExecutor.submit(new 
CompactTask(storeConfig, preMappedFile, flatFile), null);
+                    inflightCompactFuture = 
TieredStoreExecutor.compactIndexFileExecutor.submit(
+                        new CompactTask(storeConfig, preMappedFile, flatFile), 
null);
                 }
             }
         }
@@ -261,7 +274,8 @@ public class TieredIndexFile {
         }
     }
 
-    public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String 
topic, String key, long beginTime, long endTime) {
+    public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String 
topic, String key, long beginTime,
+        long endTime) {
         int hashCode = indexKeyHashMethod(buildKey(topic, key));
         int slotPosition = hashCode % maxHashSlotNum;
         List<TieredFileSegment> fileSegmentList = 
flatFile.getFileListByTime(beginTime, endTime);
@@ -355,7 +369,7 @@ public class TieredIndexFile {
         private final int fileMaxSize;
         private MappedFile originFile;
         private TieredFlatFile fileQueue;
-        private final MappedFile compactFile;
+        private MappedFile compactFile;
 
         public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile 
originFile,
             TieredFlatFile fileQueue) throws IOException {
@@ -381,6 +395,17 @@ public class TieredIndexFile {
             } catch (Throwable throwable) {
                 logger.error("TieredIndexFile#compactTask: compact index file 
failed:", throwable);
             }
+
+            try {
+                if (originFile != null) {
+                    originFile.destroy(-1);
+                }
+                if (compactFile != null) {
+                    compactFile.destroy(-1);
+                }
+            } catch (Throwable throwable) {
+                logger.error("TieredIndexFile#compactTask: destroy index file 
failed:", throwable);
+            }
         }
 
         public void compact() {
@@ -396,6 +421,8 @@ public class TieredIndexFile {
             fileQueue.commit(true);
             compactFile.destroy(-1);
             originFile.destroy(-1);
+            compactFile = null;
+            originFile = null;
         }
 
         private void buildCompactFile() {
@@ -414,6 +441,7 @@ public class TieredIndexFile {
                 if (slotValue != -1) {
                     int indexTotalSize = 0;
                     int indexPosition = slotValue;
+
                     while (indexPosition >= 0 && indexPosition < maxIndexNum) {
                         int indexOffset = INDEX_FILE_HEADER_SIZE + 
maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
                             + indexPosition * 
INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
index 7ef49578dd..262d6645b3 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
@@ -19,9 +19,8 @@ package org.apache.rocketmq.tieredstore.file;
 import com.sun.jna.Platform;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
@@ -31,9 +30,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TieredIndexFileTest {
@@ -45,11 +42,12 @@ public class TieredIndexFileTest {
     @Before
     public void setUp() {
         storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setBrokerName("IndexFileBroker");
         storeConfig.setStorePathRootDir(storePath);
-        
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
-        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
-        storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
-        mq = new MessageQueue("TieredIndexFileTest", 
storeConfig.getBrokerName(), 1);
+        
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+        storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+        mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1);
         TieredStoreUtil.getMetadataStore(storeConfig);
         TieredStoreExecutor.init();
     }
@@ -61,77 +59,33 @@ public class TieredIndexFileTest {
         TieredStoreExecutor.shutdown();
     }
 
-    @Ignore
     @Test
     public void testAppendAndQuery() throws IOException, 
ClassNotFoundException, NoSuchMethodException {
         if (Platform.isWindows()) {
             return;
         }
 
-        // skip this test on windows
-        Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
-
         TieredFileAllocator fileQueueFactory = new 
TieredFileAllocator(storeConfig);
         TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, 
storePath);
+
         indexFile.append(mq, 0, "key3", 3, 300, 1000);
         indexFile.append(mq, 0, "key2", 2, 200, 1100);
         indexFile.append(mq, 0, "key1", 1, 100, 1200);
 
-        Awaitility.waitAtMost(5, TimeUnit.SECONDS)
-            .until(() -> {
-                List<Pair<Long, ByteBuffer>> indexList = 
indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
-                if (indexList.size() != 1) {
-                    return false;
-                }
-
-                ByteBuffer indexBuffer = indexList.get(0).getValue();
-                
Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2, 
indexBuffer.remaining());
-
-                Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4));
-                Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8));
-                Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 + 
4));
-
-                Assert.assertEquals(3, 
indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 
+ 4));
-                Assert.assertEquals(300, 
indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 
4 + 8));
-                Assert.assertEquals(0, 
indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 
4 + 8 + 4));
-                return true;
-            });
-
-        indexFile.append(mq, 0, "key4", 4, 400, 1300);
-        indexFile.append(mq, 0, "key4", 4, 400, 1300);
-        indexFile.append(mq, 0, "key4", 4, 400, 1300);
-
-        Awaitility.waitAtMost(5, TimeUnit.SECONDS)
-            .until(() -> {
-                List<Pair<Long, ByteBuffer>> indexList = 
indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join();
-                if (indexList.size() != 1) {
-                    return false;
-                }
-
-                ByteBuffer indexBuffer = indexList.get(0).getValue();
-                
Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, 
indexBuffer.remaining());
-                Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
-                Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
-                Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
-                return true;
-            });
-
-        List<Pair<Long, ByteBuffer>> indexList = 
indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join();
+        // do not do schedule task here
+        TieredStoreExecutor.shutdown();
+        List<Pair<Long, ByteBuffer>> indexList =
+            indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
         Assert.assertEquals(0, indexList.size());
 
-        indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200, 
1300).join();
-        Assert.assertEquals(2, indexList.size());
-
-        ByteBuffer indexBuffer = indexList.get(0).getValue();
-        Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE 
* 3, indexBuffer.remaining());
-        Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
-        Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
-        Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+        // do compaction once
+        TieredStoreExecutor.init();
+        storeConfig.setTieredStoreIndexFileRollingIdleInterval(0);
+        indexFile.doScheduleTask();
+        Awaitility.await().atMost(Duration.ofSeconds(10))
+            .until(() -> !indexFile.getPreMappedFile().getFile().exists());
 
-        indexBuffer = indexList.get(1).getValue();
-        
Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE, 
indexBuffer.remaining());
-        Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4));
-        Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8));
-        Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+        indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 
1200).join();
+        Assert.assertEquals(1, indexList.size());
     }
 }

Reply via email to