This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 63130f51e8 [ISSUE #7545] [RIP-65] Support efficient random index for 
massive messages (#7546)
63130f51e8 is described below

commit 63130f51e84bda2547c3aa442f14184ccefb9180
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue Nov 21 13:57:44 2023 +0800

    [ISSUE #7545] [RIP-65] Support efficient random index for massive messages 
(#7546)
    
    Support efficient random index for massive messages
    
    Co-authored-by: bareheadtom <1983697...@qq.com>
---
 style/spotbugs-suppressions.xml                    |   2 +-
 tieredstore/pom.xml                                |  14 +
 .../rocketmq/tieredstore/TieredMessageFetcher.java | 103 ++---
 .../tieredstore/file/CompositeQueueFlatFile.java   |  29 +-
 .../tieredstore/file/TieredConsumeQueue.java       |   2 +-
 .../rocketmq/tieredstore/file/TieredFlatFile.java  |   5 +-
 .../tieredstore/file/TieredFlatFileManager.java    |  40 +-
 .../rocketmq/tieredstore/file/TieredIndexFile.java | 470 -------------------
 .../rocketmq/tieredstore/index/IndexFile.java      |  35 ++
 .../rocketmq/tieredstore/index/IndexItem.java      | 114 +++++
 .../rocketmq/tieredstore/index/IndexService.java   |  62 +++
 .../rocketmq/tieredstore/index/IndexStoreFile.java | 499 +++++++++++++++++++++
 .../tieredstore/index/IndexStoreService.java       | 362 +++++++++++++++
 .../tieredstore/provider/TieredFileSegment.java    |   9 +-
 .../tieredstore/provider/TieredStoreProvider.java  |  10 +-
 .../provider/posix/PosixFileSegment.java           |   1 +
 .../tieredstore/TieredMessageFetcherTest.java      |  17 +-
 .../tieredstore/file/TieredIndexFileTest.java      |  93 ----
 .../rocketmq/tieredstore/index/IndexItemTest.java  |  91 ++++
 .../tieredstore/index/IndexStoreFileTest.java      | 282 ++++++++++++
 .../index/IndexStoreServiceBenchTest.java          | 147 ++++++
 .../tieredstore/index/IndexStoreServiceTest.java   | 313 +++++++++++++
 .../tieredstore/util/MessageBufferUtilTest.java    |   1 -
 .../src/test/resources/rmq.logback-test.xml        |  15 +-
 24 files changed, 2019 insertions(+), 697 deletions(-)

diff --git a/style/spotbugs-suppressions.xml b/style/spotbugs-suppressions.xml
index 5778695e1e..6443e029fa 100644
--- a/style/spotbugs-suppressions.xml
+++ b/style/spotbugs-suppressions.xml
@@ -31,7 +31,7 @@
         <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
     </Match>
     <Match>
-        <Class name="org.apache.rocketmq.tieredstore.file.TieredIndexFile"/>
+        <Class name="org.apache.rocketmq.tieredstore.index.TieredIndexFile"/>
         <Method name="indexKeyHashMethod" />
         <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
     </Match>
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index b2ea40bf3a..9f2a8bf228 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -53,5 +53,19 @@
             <artifactId>commons-io</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <version>1.36</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <version>1.36</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index c948fa3fa1..f739773eb3 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -50,7 +51,8 @@ import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
 import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
 import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
 import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
+import org.apache.rocketmq.tieredstore.index.IndexItem;
+import org.apache.rocketmq.tieredstore.index.IndexService;
 import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
 import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
 import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
@@ -58,7 +60,6 @@ import 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
 import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
 
 public class TieredMessageFetcher implements MessageStoreFetcher {
 
@@ -555,85 +556,51 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
     public CompletableFuture<QueryMessageResult> queryMessageAsync(
         String topic, String key, int maxCount, long begin, long end) {
 
-        TieredIndexFile indexFile = 
TieredFlatFileManager.getIndexFile(storeConfig);
+        IndexService indexStoreService = 
TieredFlatFileManager.getTieredIndexService(storeConfig);
 
-        int hashCode = 
TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
         long topicId;
         try {
             TopicMetadata topicMetadata = metadataStore.getTopic(topic);
             if (topicMetadata == null) {
-                LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic 
metadata not found, topic: {}", topic);
+                LOGGER.info("MessageFetcher#queryMessageAsync, topic metadata 
not found, topic: {}", topic);
                 return CompletableFuture.completedFuture(new 
QueryMessageResult());
             }
             topicId = topicMetadata.getTopicId();
         } catch (Exception e) {
-            LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id 
failed, topic: {}", topic, e);
+            LOGGER.error("MessageFetcher#queryMessageAsync, get topic id 
failed, topic: {}", topic, e);
             return CompletableFuture.completedFuture(new QueryMessageResult());
         }
 
-        return indexFile.queryAsync(topic, key, begin, end)
-            .thenCompose(indexBufferList -> {
-                QueryMessageResult result = new QueryMessageResult();
-                int resultCount = 0;
-                List<CompletableFuture<Void>> futureList = new 
ArrayList<>(maxCount);
-                for (Pair<Long, ByteBuffer> pair : indexBufferList) {
-                    Long fileBeginTimestamp = pair.getKey();
-                    ByteBuffer indexBuffer = pair.getValue();
-
-                    if (indexBuffer.remaining() % 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
-                        LOGGER.error("[Bug] 
TieredMessageFetcher#queryMessageAsync: " +
-                            "index buffer size {} is not multiple of index 
item size {}",
-                            indexBuffer.remaining(), 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
-                        continue;
-                    }
-
-                    for (int indexOffset = indexBuffer.position();
-                        indexOffset < indexBuffer.limit();
-                        indexOffset += 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
-
-                        int indexItemHashCode = 
indexBuffer.getInt(indexOffset);
-                        if (indexItemHashCode != hashCode) {
-                            continue;
-                        }
-
-                        int indexItemTopicId = indexBuffer.getInt(indexOffset 
+ 4);
-                        if (indexItemTopicId != topicId) {
-                            continue;
-                        }
-
-                        int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
-                        CompositeFlatFile flatFile =
-                            flatFileManager.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
-                        if (flatFile == null) {
-                            continue;
-                        }
-
-                        // decode index item
-                        long offset = indexBuffer.getLong(indexOffset + 4 + 4 
+ 4);
-                        int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 
+ 8);
-                        int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 
+ 4 + 8 + 4);
-                        long indexTimestamp = fileBeginTimestamp + timeDiff;
-                        if (indexTimestamp < begin || indexTimestamp > end) {
-                            continue;
-                        }
+        CompletableFuture<List<IndexItem>> future = 
indexStoreService.queryAsync(topic, key, maxCount, begin, end);
 
-                        CompletableFuture<Void> getMessageFuture = 
flatFile.getCommitLogAsync(offset, size)
-                            .thenAccept(messageBuffer -> result.addMessage(
-                                new SelectMappedBufferResult(0, messageBuffer, 
size, null)));
-                        futureList.add(getMessageFuture);
-
-                        resultCount++;
-                        if (resultCount >= maxCount) {
-                            break;
-                        }
-                    }
-
-                    if (resultCount >= maxCount) {
-                        break;
-                    }
+        return future.thenCompose(indexItemList -> {
+            QueryMessageResult result = new QueryMessageResult();
+            List<CompletableFuture<Void>> futureList = new 
ArrayList<>(maxCount);
+            for (IndexItem indexItem : indexItemList) {
+                if (topicId != indexItem.getTopicId()) {
+                    continue;
                 }
-                return CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0]))
-                    .thenApply(v -> result);
-            });
+                CompositeFlatFile flatFile =
+                    flatFileManager.getFlatFile(new MessageQueue(topic, 
brokerName, indexItem.getQueueId()));
+                if (flatFile == null) {
+                    continue;
+                }
+                CompletableFuture<Void> getMessageFuture = flatFile
+                    .getCommitLogAsync(indexItem.getOffset(), 
indexItem.getSize())
+                    .thenAccept(messageBuffer -> result.addMessage(
+                        new SelectMappedBufferResult(
+                            indexItem.getOffset(), messageBuffer, 
indexItem.getSize(), null)));
+                futureList.add(getMessageFuture);
+                if (futureList.size() >= maxCount) {
+                    break;
+                }
+            }
+            return CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).thenApply(v -> result);
+        }).whenComplete((result, throwable) -> {
+            if (result != null) {
+                LOGGER.info("MessageFetcher#queryMessageAsync, query result: 
{}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
+                    result.getMessageBufferList().size(), topic, topicId, key, 
maxCount, begin, end);
+            }
+        });
     }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index 0a797f465f..67d2cf0646 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -17,11 +17,15 @@
 
 package org.apache.rocketmq.tieredstore.file;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.index.IndexService;
 import org.apache.rocketmq.tieredstore.metadata.QueueMetadata;
 import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -31,13 +35,13 @@ public class CompositeQueueFlatFile extends 
CompositeFlatFile {
     private final MessageQueue messageQueue;
     private long topicSequenceNumber;
     private QueueMetadata queueMetadata;
-    private final TieredIndexFile indexFile;
+    private final IndexService indexStoreService;
 
     public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, 
MessageQueue messageQueue) {
         super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
         this.messageQueue = messageQueue;
         this.recoverQueueMetadata();
-        this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
+        this.indexStoreService = 
TieredFlatFileManager.getTieredIndexService(storeConfig);
     }
 
     @Override
@@ -85,24 +89,15 @@ public class CompositeQueueFlatFile extends 
CompositeFlatFile {
             return AppendResult.FILE_CLOSED;
         }
 
+        Set<String> keySet = new HashSet<>(
+            
Arrays.asList(request.getKeys().split(MessageConst.KEY_SEPARATOR)));
         if (StringUtils.isNotBlank(request.getUniqKey())) {
-            AppendResult result = indexFile.append(messageQueue, (int) 
topicSequenceNumber,
-                request.getUniqKey(), request.getCommitLogOffset(), 
request.getMsgSize(), request.getStoreTimestamp());
-            if (result != AppendResult.SUCCESS) {
-                return result;
-            }
+            keySet.add(request.getUniqKey());
         }
 
-        for (String key : request.getKeys().split(MessageConst.KEY_SEPARATOR)) 
{
-            if (StringUtils.isNotBlank(key)) {
-                AppendResult result = indexFile.append(messageQueue, (int) 
topicSequenceNumber,
-                    key, request.getCommitLogOffset(), request.getMsgSize(), 
request.getStoreTimestamp());
-                if (result != AppendResult.SUCCESS) {
-                    return result;
-                }
-            }
-        }
-        return AppendResult.SUCCESS;
+        return indexStoreService.putKey(
+            messageQueue.getTopic(), (int) topicSequenceNumber, 
messageQueue.getQueueId(), keySet,
+            request.getCommitLogOffset(), request.getMsgSize(), 
request.getStoreTimestamp());
     }
 
     public MessageQueue getMessageQueue() {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
index 35007f8cbf..6953db032d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
@@ -20,9 +20,9 @@ import com.google.common.annotations.VisibleForTesting;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import org.apache.rocketmq.common.BoundaryType;
 
 public class TieredConsumeQueue {
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index d96eb6e8f3..a41d562d10 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -141,7 +141,6 @@ public class TieredFlatFile {
         return fileType;
     }
 
-    @VisibleForTesting
     public List<TieredFileSegment> getFileSegmentList() {
         return fileSegmentList;
     }
@@ -274,7 +273,7 @@ public class TieredFlatFile {
     }
 
     @Nullable
-    protected TieredFileSegment getFileByIndex(int index) {
+    public TieredFileSegment getFileByIndex(int index) {
         fileSegmentLock.readLock().lock();
         try {
             if (index < fileSegmentList.size()) {
@@ -354,7 +353,7 @@ public class TieredFlatFile {
         }
     }
 
-    protected List<TieredFileSegment> getFileListByTime(long beginTime, long 
endTime) {
+    public List<TieredFileSegment> getFileListByTime(long beginTime, long 
endTime) {
         fileSegmentLock.readLock().lock();
         try {
             return fileSegmentList.stream()
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index 087ea8c9ce..ffe0836f12 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.index.IndexService;
+import org.apache.rocketmq.tieredstore.index.IndexStoreService;
 import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
@@ -43,7 +45,7 @@ public class TieredFlatFileManager {
     private static final Logger logger = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
 
     private static volatile TieredFlatFileManager instance;
-    private static volatile TieredIndexFile indexFile;
+    private static volatile IndexStoreService indexStoreService;
 
     private final TieredMetadataStore metadataStore;
     private final TieredMessageStoreConfig storeConfig;
@@ -76,25 +78,26 @@ public class TieredFlatFileManager {
         return instance;
     }
 
-    public static TieredIndexFile getIndexFile(TieredMessageStoreConfig 
storeConfig) {
+    public static IndexService getTieredIndexService(TieredMessageStoreConfig 
storeConfig) {
         if (storeConfig == null) {
-            return indexFile;
+            return indexStoreService;
         }
 
-        if (indexFile == null) {
+        if (indexStoreService == null) {
             synchronized (TieredFlatFileManager.class) {
-                if (indexFile == null) {
+                if (indexStoreService == null) {
                     try {
                         String filePath = TieredStoreUtil.toPath(new 
MessageQueue(
                             TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, 
storeConfig.getBrokerName(), 0));
-                        indexFile = new TieredIndexFile(new 
TieredFileAllocator(storeConfig), filePath);
+                        indexStoreService = new IndexStoreService(new 
TieredFileAllocator(storeConfig), filePath);
+                        indexStoreService.start();
                     } catch (Exception e) {
                         logger.error("Construct FlatFileManager indexFile 
error", e);
                     }
                 }
             }
         }
-        return indexFile;
+        return indexStoreService;
     }
 
     public void doCommit() {
@@ -120,15 +123,6 @@ public class TieredFlatFileManager {
                 }
             }, delay, TimeUnit.MILLISECONDS);
         }
-        TieredStoreExecutor.commitExecutor.schedule(() -> {
-            try {
-                if (indexFile != null) {
-                    indexFile.commit(true);
-                }
-            } catch (Throwable e) {
-                logger.error("Commit indexFile periodically failed", e);
-            }
-        }, 0, TimeUnit.MILLISECONDS);
     }
 
     public void doCleanExpiredFile() {
@@ -148,10 +142,6 @@ public class TieredFlatFileManager {
                 }
             });
         }
-        if (indexFile != null) {
-            indexFile.cleanExpiredFile(expiredTimeStamp);
-            indexFile.destroyExpiredFile();
-        }
     }
 
     private void doScheduleTask() {
@@ -244,7 +234,7 @@ public class TieredFlatFileManager {
 
     private static void cleanStaticReference() {
         instance = null;
-        indexFile = null;
+        indexStoreService = null;
     }
 
     @Nullable
@@ -271,8 +261,8 @@ public class TieredFlatFileManager {
     }
 
     public void shutdown() {
-        if (indexFile != null) {
-            indexFile.commit(true);
+        if (indexStoreService != null) {
+            indexStoreService.shutdown();
         }
         for (CompositeFlatFile flatFile : deepCopyFlatFileToList()) {
             flatFile.shutdown();
@@ -280,8 +270,8 @@ public class TieredFlatFileManager {
     }
 
     public void destroy() {
-        if (indexFile != null) {
-            indexFile.destroy();
+        if (indexStoreService != null) {
+            indexStoreService.destroy();
         }
         ImmutableList<CompositeQueueFlatFile> flatFileList = 
deepCopyFlatFileToList();
         cleanup();
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
deleted file mode 100644
index eda5e01065..0000000000
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tieredstore.file;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.index.IndexHeader;
-import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
-import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-
-public class TieredIndexFile {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
-
-    // header format:
-    // magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + 
index num(4)
-    public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
-    public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4;
-    public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12;
-    public static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20;
-    public static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24;
-    public static final int INDEX_FILE_HEADER_SIZE = 28;
-
-    // index item
-    public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 
1880681586 + 4;
-    public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 
1880681586 + 8;
-    public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
-    public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
-    public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
-
-    private static final String INDEX_FILE_DIR_NAME = "tiered_index_file";
-    private static final String CUR_INDEX_FILE_NAME = "0000";
-    private static final String PRE_INDEX_FILE_NAME = "1111";
-    private static final String COMPACT_FILE_NAME = "2222";
-
-    private final TieredMessageStoreConfig storeConfig;
-    private final TieredFlatFile flatFile;
-    private final int maxHashSlotNum;
-    private final int maxIndexNum;
-    private final int fileMaxSize;
-    private final String curFilePath;
-    private final String preFilepath;
-    private MappedFile preMappedFile;
-    private MappedFile curMappedFile;
-
-    private final ReentrantLock curFileLock = new ReentrantLock();
-    private Future<Void> inflightCompactFuture = 
CompletableFuture.completedFuture(null);
-
-    protected TieredIndexFile(TieredFileAllocator fileQueueFactory, String 
filePath) throws IOException {
-        this.storeConfig = fileQueueFactory.getStoreConfig();
-        this.flatFile = fileQueueFactory.createFlatFileForIndexFile(filePath);
-        if (flatFile.getBaseOffset() == -1) {
-            flatFile.setBaseOffset(0);
-        }
-        this.maxHashSlotNum = 
storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
-        this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
-        this.fileMaxSize = IndexHeader.INDEX_HEADER_SIZE
-            + this.maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
-            + this.maxIndexNum * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE
-            + 4;
-        this.curFilePath = Paths.get(
-            storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, 
CUR_INDEX_FILE_NAME).toString();
-        this.preFilepath = Paths.get(
-            storeConfig.getStorePathRootDir(), INDEX_FILE_DIR_NAME, 
PRE_INDEX_FILE_NAME).toString();
-        initFile();
-        TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(
-            this::doScheduleTask, 10, 10, TimeUnit.SECONDS);
-    }
-
-    protected void doScheduleTask() {
-        try {
-            curFileLock.lock();
-            try {
-                synchronized (TieredIndexFile.class) {
-                    MappedByteBuffer mappedByteBuffer = 
curMappedFile.getMappedByteBuffer();
-                    int indexNum = 
mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
-                    long lastIndexTime = 
mappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
-                    if (indexNum > 0 &&
-                        System.currentTimeMillis() - lastIndexTime >
-                            
storeConfig.getTieredStoreIndexFileRollingIdleInterval()) {
-                        mappedByteBuffer.putInt(fileMaxSize - 4, 
INDEX_FILE_END_MAGIC_CODE);
-                        rollingFile();
-                    }
-                    if (inflightCompactFuture.isDone() && preMappedFile != 
null && preMappedFile.isAvailable()) {
-                        inflightCompactFuture = 
TieredStoreExecutor.compactIndexFileExecutor.submit(
-                            new CompactTask(storeConfig, preMappedFile, 
flatFile), null);
-                    }
-                }
-            } finally {
-                curFileLock.unlock();
-            }
-        } catch (Throwable throwable) {
-            logger.error("TieredIndexFile: submit compact index file task 
failed:", throwable);
-        }
-    }
-
-    private static boolean isFileSealed(MappedFile mappedFile) {
-        return 
mappedFile.getMappedByteBuffer().getInt(mappedFile.getFileSize() - 4) == 
INDEX_FILE_END_MAGIC_CODE;
-    }
-
-    private void initIndexFileHeader(MappedFile mappedFile) {
-        MappedByteBuffer mappedByteBuffer = mappedFile.getMappedByteBuffer();
-        if (mappedByteBuffer.getInt(0) != INDEX_FILE_BEGIN_MAGIC_CODE) {
-            mappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, 
INDEX_FILE_BEGIN_MAGIC_CODE);
-            
mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, -1L);
-            
mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, -1L);
-            mappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, 0);
-            mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 0);
-            for (int i = 0; i < maxHashSlotNum; i++) {
-                mappedByteBuffer.putInt(INDEX_FILE_HEADER_SIZE + i * 
INDEX_FILE_HASH_SLOT_SIZE, -1);
-            }
-            mappedByteBuffer.putInt(fileMaxSize - 4, -1);
-        }
-    }
-
-    @VisibleForTesting
-    public MappedFile getPreMappedFile() {
-        return preMappedFile;
-    }
-
-    private void initFile() throws IOException {
-        curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
-        initIndexFileHeader(curMappedFile);
-        File preFile = new File(preFilepath);
-        boolean preFileExists = preFile.exists();
-        if (preFileExists) {
-            preMappedFile = new DefaultMappedFile(preFilepath, fileMaxSize);
-        }
-
-        if (isFileSealed(curMappedFile)) {
-            if (preFileExists) {
-                if (preFile.delete()) {
-                    logger.info("Pre IndexFile deleted success", preFilepath);
-                } else {
-                    logger.error("Pre IndexFile deleted failed", preFilepath);
-                }
-            }
-            boolean rename = curMappedFile.renameTo(preFilepath);
-            if (rename) {
-                preMappedFile = curMappedFile;
-                curMappedFile = new DefaultMappedFile(curFilePath, 
fileMaxSize);
-                initIndexFileHeader(curMappedFile);
-                preFileExists = true;
-            }
-        }
-
-        if (preFileExists) {
-            synchronized (TieredIndexFile.class) {
-                if (inflightCompactFuture.isDone()) {
-                    inflightCompactFuture = 
TieredStoreExecutor.compactIndexFileExecutor.submit(
-                        new CompactTask(storeConfig, preMappedFile, flatFile), 
null);
-                }
-            }
-        }
-    }
-
-    public AppendResult append(MessageQueue mq, int topicId, String key, long 
offset, int size, long timeStamp) {
-        return putKey(mq, topicId, indexKeyHashMethod(buildKey(mq.getTopic(), 
key)), offset, size, timeStamp);
-    }
-
-    private boolean rollingFile() throws IOException {
-        File preFile = new File(preFilepath);
-        boolean preFileExists = preFile.exists();
-        if (!preFileExists) {
-            boolean rename = curMappedFile.renameTo(preFilepath);
-            if (rename) {
-                preMappedFile = curMappedFile;
-                curMappedFile = new DefaultMappedFile(curFilePath, 
fileMaxSize);
-                initIndexFileHeader(curMappedFile);
-                tryToCompactPreFile();
-                return true;
-            } else {
-                logger.error("TieredIndexFile#rollingFile: rename current file 
failed");
-                return false;
-            }
-        }
-        tryToCompactPreFile();
-        return false;
-    }
-
-    private void tryToCompactPreFile() throws IOException {
-        synchronized (TieredIndexFile.class) {
-            if (inflightCompactFuture.isDone()) {
-                inflightCompactFuture = 
TieredStoreExecutor.compactIndexFileExecutor.submit(new 
CompactTask(storeConfig, preMappedFile, flatFile), null);
-            }
-        }
-    }
-
-    private AppendResult putKey(MessageQueue mq, int topicId, int hashCode, 
long offset, int size, long timeStamp) {
-        curFileLock.lock();
-        try {
-            if (isFileSealed(curMappedFile) && !rollingFile()) {
-                return AppendResult.FILE_FULL;
-            }
-
-            MappedByteBuffer mappedByteBuffer = 
curMappedFile.getMappedByteBuffer();
-
-            int slotPosition = hashCode % maxHashSlotNum;
-            int slotOffset = INDEX_FILE_HEADER_SIZE + slotPosition * 
INDEX_FILE_HASH_SLOT_SIZE;
-
-            int slotValue = mappedByteBuffer.getInt(slotOffset);
-
-            long beginTimeStamp = 
mappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
-            if (beginTimeStamp == -1) {
-                
mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, 
timeStamp);
-                beginTimeStamp = timeStamp;
-            }
-
-            int indexCount = 
mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
-            int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * 
INDEX_FILE_HASH_SLOT_SIZE
-                + indexCount * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
-
-            int timeDiff = (int) (timeStamp - beginTimeStamp);
-
-            // put hash index
-            mappedByteBuffer.putInt(indexOffset, hashCode);
-            mappedByteBuffer.putInt(indexOffset + 4, topicId);
-            mappedByteBuffer.putInt(indexOffset + 4 + 4, mq.getQueueId());
-            mappedByteBuffer.putLong(indexOffset + 4 + 4 + 4, offset);
-            mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8, size);
-            mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4, timeDiff);
-            mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4, 
slotValue);
-
-            // put hash slot
-            mappedByteBuffer.putInt(slotOffset, indexCount);
-
-            // put header
-            indexCount += 1;
-            mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 
indexCount);
-            
mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, timeStamp);
-            if (indexCount == maxIndexNum) {
-                mappedByteBuffer.putInt(fileMaxSize - 4, 
INDEX_FILE_END_MAGIC_CODE);
-                rollingFile();
-            }
-            return AppendResult.SUCCESS;
-        } catch (Exception e) {
-            logger.error("TieredIndexFile#putKey: put key failed:", e);
-            return AppendResult.IO_ERROR;
-        } finally {
-            curFileLock.unlock();
-        }
-    }
-
-    public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String 
topic, String key, long beginTime,
-        long endTime) {
-        int hashCode = indexKeyHashMethod(buildKey(topic, key));
-        int slotPosition = hashCode % maxHashSlotNum;
-        List<TieredFileSegment> fileSegmentList = 
flatFile.getFileListByTime(beginTime, endTime);
-        CompletableFuture<List<Pair<Long, ByteBuffer>>> future = null;
-        for (int i = fileSegmentList.size() - 1; i >= 0; i--) {
-            TieredFileSegment fileSegment = fileSegmentList.get(i);
-            CompletableFuture<ByteBuffer> tmpFuture = 
fileSegment.readAsync(INDEX_FILE_HEADER_SIZE + (long) slotPosition * 
INDEX_FILE_HASH_SLOT_SIZE, INDEX_FILE_HASH_SLOT_SIZE)
-                .thenCompose(slotBuffer -> {
-                    int indexPosition = slotBuffer.getInt();
-                    if (indexPosition == -1) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    int indexSize = slotBuffer.getInt();
-                    if (indexSize <= 0) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    return fileSegment.readAsync(indexPosition, indexSize);
-                });
-            if (future == null) {
-                future = tmpFuture.thenApply(indexBuffer -> {
-                    List<Pair<Long, ByteBuffer>> result = new ArrayList<>();
-                    if (indexBuffer != null) {
-                        result.add(Pair.of(fileSegment.getMinTimestamp(), 
indexBuffer));
-                    }
-                    return result;
-                });
-            } else {
-                future = future.thenCombine(tmpFuture, (indexList, 
indexBuffer) -> {
-                    if (indexBuffer != null) {
-                        indexList.add(Pair.of(fileSegment.getMinTimestamp(), 
indexBuffer));
-                    }
-                    return indexList;
-                });
-            }
-        }
-        return future == null ? CompletableFuture.completedFuture(new 
ArrayList<>()) : future;
-    }
-
-    public static String buildKey(String topic, String key) {
-        return topic + "#" + key;
-    }
-
-    public static int indexKeyHashMethod(String key) {
-        int keyHash = key.hashCode();
-        int keyHashPositive = Math.abs(keyHash);
-        if (keyHashPositive < 0)
-            keyHashPositive = 0;
-        return keyHashPositive;
-    }
-
-    public void commit(boolean sync) {
-        flatFile.commit(sync);
-        if (sync) {
-            try {
-                inflightCompactFuture.get();
-            } catch (Exception ignore) {
-            }
-        }
-    }
-
-    public void cleanExpiredFile(long expireTimestamp) {
-        flatFile.cleanExpiredFile(expireTimestamp);
-    }
-
-    public void destroyExpiredFile() {
-        flatFile.destroyExpiredFile();
-    }
-
-    public void destroy() {
-        inflightCompactFuture.cancel(true);
-        if (preMappedFile != null) {
-            preMappedFile.destroy(-1);
-        }
-        if (curMappedFile != null) {
-            curMappedFile.destroy(-1);
-        }
-        String compactFilePath = storeConfig.getStorePathRootDir() + 
File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
-        File compactFile = new File(compactFilePath);
-        if (compactFile.exists()) {
-            compactFile.delete();
-        }
-        flatFile.destroy();
-    }
-
-    static class CompactTask implements Runnable {
-        private final TieredMessageStoreConfig storeConfig;
-
-        private final int maxHashSlotNum;
-        private final int maxIndexNum;
-        private final int fileMaxSize;
-        private MappedFile originFile;
-        private TieredFlatFile fileQueue;
-        private MappedFile compactFile;
-
-        public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile 
originFile,
-            TieredFlatFile fileQueue) throws IOException {
-            this.storeConfig = storeConfig;
-            this.maxHashSlotNum = 
storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
-            this.maxIndexNum = 
storeConfig.getTieredStoreIndexFileMaxIndexNum();
-            this.originFile = originFile;
-            this.fileQueue = fileQueue;
-            String compactFilePath = storeConfig.getStorePathRootDir() + 
File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
-            fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + 
(storeConfig.getTieredStoreIndexFileMaxHashSlotNum() * 
INDEX_FILE_HASH_SLOT_SIZE) + (storeConfig.getTieredStoreIndexFileMaxIndexNum() 
* INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4;
-            // TODO check magic code, upload immediately when compact complete
-            File compactFile = new File(compactFilePath);
-            if (compactFile.exists()) {
-                compactFile.delete();
-            }
-            this.compactFile = new DefaultMappedFile(compactFilePath, 
fileMaxSize);
-        }
-
-        @Override
-        public void run() {
-            try {
-                compact();
-            } catch (Throwable throwable) {
-                logger.error("TieredIndexFile#compactTask: compact index file 
failed:", throwable);
-            }
-
-            try {
-                if (originFile != null) {
-                    originFile.destroy(-1);
-                }
-                if (compactFile != null) {
-                    compactFile.destroy(-1);
-                }
-            } catch (Throwable throwable) {
-                logger.error("TieredIndexFile#compactTask: destroy index file 
failed:", throwable);
-            }
-        }
-
-        public void compact() {
-            if (!isFileSealed(originFile)) {
-                logger.error("[Bug]TieredIndexFile#CompactTask#compact: try to 
compact unsealed file");
-                originFile.destroy(-1);
-                compactFile.destroy(-1);
-                return;
-            }
-
-            buildCompactFile();
-            fileQueue.append(compactFile.getMappedByteBuffer());
-            fileQueue.commit(true);
-            compactFile.destroy(-1);
-            originFile.destroy(-1);
-            compactFile = null;
-            originFile = null;
-        }
-
-        private void buildCompactFile() {
-            MappedByteBuffer originMappedByteBuffer = 
originFile.getMappedByteBuffer();
-            MappedByteBuffer compactMappedByteBuffer = 
compactFile.getMappedByteBuffer();
-            
compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, 
INDEX_FILE_BEGIN_MAGIC_CODE);
-            
compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, 
originMappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION));
-            
compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, 
originMappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION));
-            
compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, 
maxHashSlotNum);
-            
compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 
originMappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION));
-
-            int rePutSlotValue = INDEX_FILE_HEADER_SIZE + (maxHashSlotNum * 
INDEX_FILE_HASH_SLOT_SIZE);
-            for (int i = 0; i < maxHashSlotNum; i++) {
-                int slotOffset = INDEX_FILE_HEADER_SIZE + i * 
INDEX_FILE_HASH_SLOT_SIZE;
-                int slotValue = originMappedByteBuffer.getInt(slotOffset);
-                if (slotValue != -1) {
-                    int indexTotalSize = 0;
-                    int indexPosition = slotValue;
-
-                    while (indexPosition >= 0 && indexPosition < maxIndexNum) {
-                        int indexOffset = INDEX_FILE_HEADER_SIZE + 
maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
-                            + indexPosition * 
INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
-                        int rePutIndexOffset = rePutSlotValue + indexTotalSize;
-
-                        compactMappedByteBuffer.putInt(rePutIndexOffset, 
originMappedByteBuffer.getInt(indexOffset));
-                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4, 
originMappedByteBuffer.getInt(indexOffset + 4));
-                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 
4, originMappedByteBuffer.getInt(indexOffset + 4 + 4));
-                        compactMappedByteBuffer.putLong(rePutIndexOffset + 4 + 
4 + 4, originMappedByteBuffer.getLong(indexOffset + 4 + 4 + 4));
-                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 
4 + 4 + 8, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8));
-                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 
4 + 4 + 8 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4));
-
-                        indexTotalSize += INDEX_FILE_HASH_COMPACT_INDEX_SIZE;
-                        indexPosition = 
originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4);
-                    }
-                    compactMappedByteBuffer.putInt(slotOffset, rePutSlotValue);
-                    compactMappedByteBuffer.putInt(slotOffset + 4, 
indexTotalSize);
-                    rePutSlotValue += indexTotalSize;
-                }
-            }
-            
compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, 
INDEX_FILE_END_MAGIC_CODE);
-            compactMappedByteBuffer.putInt(rePutSlotValue, 
INDEX_FILE_BEGIN_MAGIC_CODE);
-            compactMappedByteBuffer.limit(rePutSlotValue + 4);
-        }
-    }
-}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
new file mode 100644
index 0000000000..d131b9b53e
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexFile.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import java.nio.ByteBuffer;
+
+public interface IndexFile extends IndexService {
+
+    /**
+     * Enumeration for the status of the index file.
+     */
+    enum IndexStatusEnum {
+        SHUTDOWN, UNSEALED, SEALED, UPLOAD
+    }
+
+    long getTimestamp();
+
+    IndexStatusEnum getFileStatus();
+
+    ByteBuffer doCompaction();
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
new file mode 100644
index 0000000000..24ccc4322f
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexItem.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import java.nio.ByteBuffer;
+
+public class IndexItem {
+
+    public static final int INDEX_ITEM_SIZE = 32;
+    public static final int COMPACT_INDEX_ITEM_SIZE = 28;
+
+    private final int hashCode;
+    private final int topicId;
+    private final int queueId;
+    private final long offset;
+    private final int size;
+    private final int timeDiff;
+    private final int itemIndex;
+
+    public IndexItem(int topicId, int queueId, long offset, int size, int 
hashCode, int timeDiff, int itemIndex) {
+        this.hashCode = hashCode;
+        this.topicId = topicId;
+        this.queueId = queueId;
+        this.offset = offset;
+        this.size = size;
+        this.timeDiff = timeDiff;
+        this.itemIndex = itemIndex;
+    }
+
+    public IndexItem(byte[] bytes) {
+        if (bytes == null ||
+            bytes.length != INDEX_ITEM_SIZE &&
+                bytes.length != COMPACT_INDEX_ITEM_SIZE) {
+            throw new IllegalArgumentException("Byte array length not 
correct");
+        }
+
+        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+        hashCode = byteBuffer.getInt(0);
+        topicId = byteBuffer.getInt(4);
+        queueId = byteBuffer.getInt(8);
+        offset = byteBuffer.getLong(12);
+        size = byteBuffer.getInt(20);
+        timeDiff = byteBuffer.getInt(24);
+        itemIndex = bytes.length == INDEX_ITEM_SIZE ? byteBuffer.getInt(28) : 
0;
+    }
+
+    public ByteBuffer getByteBuffer() {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(32);
+        byteBuffer.putInt(0, hashCode);
+        byteBuffer.putInt(4, topicId);
+        byteBuffer.putInt(8, queueId);
+        byteBuffer.putLong(12, offset);
+        byteBuffer.putInt(20, size);
+        byteBuffer.putInt(24, timeDiff);
+        byteBuffer.putInt(28, itemIndex);
+        return byteBuffer;
+    }
+
+    public int getHashCode() {
+        return hashCode;
+    }
+
+    public int getTopicId() {
+        return topicId;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public int getTimeDiff() {
+        return timeDiff;
+    }
+
+    public int getItemIndex() {
+        return itemIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "IndexItem{" +
+            "hashCode=" + hashCode +
+            ", topicId=" + topicId +
+            ", queueId=" + queueId +
+            ", offset=" + offset +
+            ", size=" + size +
+            ", timeDiff=" + timeDiff +
+            ", position=" + itemIndex +
+            '}';
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
new file mode 100644
index 0000000000..d4eb854a2e
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+
+public interface IndexService {
+
+    /**
+     * Puts a key into the index.
+     *
+     * @param topic     The topic of the key.
+     * @param topicId   The ID of the topic.
+     * @param queueId   The ID of the queue.
+     * @param keySet    The set of keys to be indexed.
+     * @param offset    The offset value of the key.
+     * @param size      The size of the key.
+     * @param timestamp The timestamp of the key.
+     * @return The result of the put operation.
+     */
+    AppendResult putKey(
+        String topic, int topicId, int queueId, Set<String> keySet, long 
offset, int size, long timestamp);
+
+    /**
+     * Asynchronously queries the index for a specific key within a given time 
range.
+     *
+     * @param topic     The topic of the key.
+     * @param key       The key to be queried.
+     * @param beginTime The start time of the query range.
+     * @param endTime   The end time of the query range.
+     * @return A CompletableFuture that holds the list of IndexItems matching 
the query.
+     */
+    CompletableFuture<List<IndexItem>> queryAsync(String topic, String key, 
int maxCount, long beginTime, long endTime);
+
+    /**
+     * Shutdown the index service.
+     */
+    void shutdown();
+
+    /**
+     * Destroys the index service and releases all resources.
+     */
+    void destroy();
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
new file mode 100644
index 0000000000..52a686f685
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import com.google.common.base.Stopwatch;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+import static 
org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.SEALED;
+import static 
org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UNSEALED;
+import static 
org.apache.rocketmq.tieredstore.index.IndexFile.IndexStatusEnum.UPLOAD;
+import static 
org.apache.rocketmq.tieredstore.index.IndexItem.COMPACT_INDEX_ITEM_SIZE;
+import static 
org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_COMPACTED_DIRECTORY_NAME;
+import static 
org.apache.rocketmq.tieredstore.index.IndexStoreService.FILE_DIRECTORY_NAME;
+
+/**
+ * a single IndexFile in indexService
+ */
+public class IndexStoreFile implements IndexFile {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    /**
+     * header format:
+     * magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + 
index num(4)
+     */
+    public static final int INDEX_MAGIC_CODE = 0;
+    public static final int INDEX_BEGIN_TIME_STAMP = 4;
+    public static final int INDEX_END_TIME_STAMP = 12;
+    public static final int INDEX_SLOT_COUNT = 20;
+    public static final int INDEX_ITEM_INDEX = 24;
+    public static final int INDEX_HEADER_SIZE = 28;
+
+    public static final int BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
+    public static final int END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
+
+    /**
+     * hash slot
+     */
+    private static final int INVALID_INDEX = 0;
+    private static final int HASH_SLOT_SIZE = Long.BYTES;
+    private static final int MAX_QUERY_COUNT = 512;
+
+    private final int hashSlotMaxCount;
+    private final int indexItemMaxCount;
+
+    private final ReadWriteLock fileReadWriteLock;
+    private final AtomicReference<IndexStatusEnum> fileStatus;
+    private final AtomicLong beginTimestamp = new AtomicLong(-1L);
+    private final AtomicLong endTimestamp = new AtomicLong(-1L);
+    private final AtomicInteger hashSlotCount = new AtomicInteger(0);
+    private final AtomicInteger indexItemCount = new AtomicInteger(0);
+
+    private MappedFile mappedFile;
+    private ByteBuffer byteBuffer;
+    private MappedFile compactMappedFile;
+    private TieredFileSegment fileSegment;
+
+    public IndexStoreFile(TieredMessageStoreConfig storeConfig, long 
timestamp) throws IOException {
+        this.hashSlotMaxCount = 
storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+        this.indexItemMaxCount = 
storeConfig.getTieredStoreIndexFileMaxIndexNum();
+        this.fileStatus = new AtomicReference<>(UNSEALED);
+        this.fileReadWriteLock = new ReentrantReadWriteLock();
+        this.mappedFile = new DefaultMappedFile(
+            Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME, 
String.valueOf(timestamp)).toString(),
+            this.getItemPosition(indexItemMaxCount));
+        this.byteBuffer = this.mappedFile.getMappedByteBuffer();
+
+        this.beginTimestamp.set(timestamp);
+        this.endTimestamp.set(byteBuffer.getLong(INDEX_BEGIN_TIME_STAMP));
+        this.hashSlotCount.set(byteBuffer.getInt(INDEX_SLOT_COUNT));
+        this.indexItemCount.set(byteBuffer.getInt(INDEX_ITEM_INDEX));
+        this.flushNewMetadata(byteBuffer, indexItemMaxCount == 
this.indexItemCount.get() + 1);
+    }
+
+    public IndexStoreFile(TieredMessageStoreConfig storeConfig, 
TieredFileSegment fileSegment) {
+        this.fileSegment = fileSegment;
+        this.fileStatus = new AtomicReference<>(UPLOAD);
+        this.fileReadWriteLock = new ReentrantReadWriteLock();
+
+        this.beginTimestamp.set(fileSegment.getMinTimestamp());
+        this.endTimestamp.set(fileSegment.getMaxTimestamp());
+        
this.hashSlotCount.set(storeConfig.getTieredStoreIndexFileMaxHashSlotNum());
+        
this.indexItemCount.set(storeConfig.getTieredStoreIndexFileMaxIndexNum());
+        this.hashSlotMaxCount = hashSlotCount.get();
+        this.indexItemMaxCount = indexItemCount.get();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return this.beginTimestamp.get();
+    }
+
+    public long getEndTimestamp() {
+        return this.endTimestamp.get();
+    }
+
+    public long getHashSlotCount() {
+        return this.hashSlotCount.get();
+    }
+
+    public long getIndexItemCount() {
+        return this.indexItemCount.get();
+    }
+
+    @Override
+    public IndexStatusEnum getFileStatus() {
+        return this.fileStatus.get();
+    }
+
+    protected String buildKey(String topic, String key) {
+        return String.format("%s#%s", topic, key);
+    }
+
+    protected int hashCode(String keyStr) {
+        int keyHash = keyStr.hashCode();
+        return (keyHash < 0) ? -keyHash : keyHash;
+    }
+
+    protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) {
+        byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : 
END_MAGIC_CODE);
+        byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get());
+        byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
+        byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
+        byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
+    }
+
+    protected int getSlotPosition(int slotIndex) {
+        return INDEX_HEADER_SIZE + slotIndex * HASH_SLOT_SIZE;
+    }
+
+    protected int getSlotValue(int slotPosition) {
+        return Math.max(this.byteBuffer.getInt(slotPosition), INVALID_INDEX);
+    }
+
+    protected int getItemPosition(int itemIndex) {
+        return INDEX_HEADER_SIZE + hashSlotMaxCount * HASH_SLOT_SIZE + 
itemIndex * IndexItem.INDEX_ITEM_SIZE;
+    }
+
+    @Override
+    public AppendResult putKey(
+        String topic, int topicId, int queueId, Set<String> keySet, long 
offset, int size, long timestamp) {
+
+        if (StringUtils.isBlank(topic)) {
+            return AppendResult.UNKNOWN_ERROR;
+        }
+
+        if (keySet == null || keySet.isEmpty()) {
+            return AppendResult.SUCCESS;
+        }
+
+        try {
+            fileReadWriteLock.writeLock().lock();
+
+            if (!UNSEALED.equals(fileStatus.get())) {
+                return AppendResult.FILE_FULL;
+            }
+
+            if (this.indexItemCount.get() + keySet.size() >= 
this.indexItemMaxCount) {
+                this.fileStatus.set(IndexStatusEnum.SEALED);
+                return AppendResult.FILE_FULL;
+            }
+
+            for (String key : keySet) {
+                int hashCode = this.hashCode(this.buildKey(topic, key));
+                int slotPosition = this.getSlotPosition(hashCode % 
this.hashSlotMaxCount);
+                int slotOldValue = this.getSlotValue(slotPosition);
+                int timeDiff = (int) ((timestamp - this.beginTimestamp.get()) 
/ 1000L);
+
+                IndexItem indexItem = new IndexItem(
+                    topicId, queueId, offset, size, hashCode, timeDiff, 
slotOldValue);
+                int itemIndex = this.indexItemCount.incrementAndGet();
+                this.byteBuffer.position(this.getItemPosition(itemIndex));
+                this.byteBuffer.put(indexItem.getByteBuffer());
+                this.byteBuffer.putInt(slotPosition, itemIndex);
+
+                if (slotOldValue <= INVALID_INDEX) {
+                    this.hashSlotCount.incrementAndGet();
+                }
+                if (this.endTimestamp.get() < timestamp) {
+                    this.endTimestamp.set(timestamp);
+                }
+                this.flushNewMetadata(byteBuffer, indexItemMaxCount == 
this.indexItemCount.get() + 1);
+
+                log.trace("IndexStoreFile put key, timestamp: {}, topic: {}, 
key: {}, slot: {}, item: {}, previous item: {}, content: {}",
+                    this.getTimestamp(), topic, key, hashCode % 
this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
+            }
+            return AppendResult.SUCCESS;
+        } catch (Exception e) {
+            log.error("IndexStoreFile put key error, topic: {}, topicId: {}, 
queueId: {}, keySet: {}, offset: {}, " +
+                "size: {}, timestamp: {}", topic, topicId, queueId, keySet, 
offset, size, timestamp, e);
+        } finally {
+            fileReadWriteLock.writeLock().unlock();
+        }
+
+        return AppendResult.UNKNOWN_ERROR;
+    }
+
+    @Override
+    public CompletableFuture<List<IndexItem>> queryAsync(
+        String topic, String key, int maxCount, long beginTime, long endTime) {
+
+        switch (this.fileStatus.get()) {
+            case UNSEALED:
+            case SEALED:
+                return this.queryAsyncFromUnsealedFile(buildKey(topic, key), 
maxCount, beginTime, endTime);
+            case UPLOAD:
+                return this.queryAsyncFromSegmentFile(buildKey(topic, key), 
maxCount, beginTime, endTime);
+            case SHUTDOWN:
+            default:
+                return CompletableFuture.completedFuture(new ArrayList<>());
+        }
+    }
+
+    protected CompletableFuture<List<IndexItem>> queryAsyncFromUnsealedFile(
+        String key, int maxCount, long beginTime, long endTime) {
+
+        return CompletableFuture.supplyAsync(() -> {
+            List<IndexItem> result = new ArrayList<>();
+            try {
+                fileReadWriteLock.readLock().lock();
+                if (!UNSEALED.equals(this.fileStatus.get()) && 
!SEALED.equals(this.fileStatus.get())) {
+                    return result;
+                }
+
+                if (mappedFile == null || !mappedFile.hold()) {
+                    return result;
+                }
+
+                int hashCode = this.hashCode(key);
+                int slotPosition = this.getSlotPosition(hashCode % 
this.hashSlotMaxCount);
+                int slotValue = this.getSlotValue(slotPosition);
+
+                int left = MAX_QUERY_COUNT;
+                while (left > 0 &&
+                    slotValue > INVALID_INDEX &&
+                    slotValue <= this.indexItemCount.get()) {
+
+                    byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
+                    ByteBuffer buffer = this.byteBuffer.duplicate();
+                    buffer.position(this.getItemPosition(slotValue));
+                    buffer.get(bytes);
+                    IndexItem indexItem = new IndexItem(bytes);
+                    if (hashCode == indexItem.getHashCode()) {
+                        result.add(indexItem);
+                        if (result.size() > maxCount) {
+                            break;
+                        }
+                    }
+                    slotValue = indexItem.getItemIndex();
+                    left--;
+                }
+
+                log.debug("IndexStoreFile query from unsealed mapped file, 
timestamp: {}, result size: {}, " +
+                        "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+                    getTimestamp(), result.size(), key, hashCode, maxCount, 
beginTime, endTime);
+            } catch (Exception e) {
+                log.error("IndexStoreFile query from unsealed mapped file 
error, timestamp: {}, " +
+                    "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), 
key, maxCount, beginTime, endTime, e);
+            } finally {
+                fileReadWriteLock.readLock().unlock();
+                mappedFile.release();
+            }
+            return result;
+        }, TieredStoreExecutor.fetchDataExecutor);
+    }
+
+    protected CompletableFuture<List<IndexItem>> queryAsyncFromSegmentFile(
+        String key, int maxCount, long beginTime, long endTime) {
+
+        if (this.fileSegment == null || !UPLOAD.equals(this.fileStatus.get())) 
{
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        int hashCode = this.hashCode(key);
+        int slotPosition = this.getSlotPosition(hashCode % 
this.hashSlotMaxCount);
+
+        CompletableFuture<List<IndexItem>> future = 
this.fileSegment.readAsync(slotPosition, HASH_SLOT_SIZE)
+            .thenCompose(slotBuffer -> {
+                if (slotBuffer.remaining() < HASH_SLOT_SIZE) {
+                    log.error("IndexStoreFile query from tiered storage return 
error slot buffer, " +
+                        "key: {}, maxCount: {}, timestamp={}-{}", key, 
maxCount, beginTime, endTime);
+                    return CompletableFuture.completedFuture(null);
+                }
+                int indexPosition = slotBuffer.getInt();
+                int indexTotalSize = Math.min(slotBuffer.getInt(), 
COMPACT_INDEX_ITEM_SIZE * 1024);
+                if (indexPosition <= INVALID_INDEX || indexTotalSize <= 0) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                return this.fileSegment.readAsync(indexPosition, 
indexTotalSize);
+            })
+            .thenApply(itemBuffer -> {
+                List<IndexItem> result = new ArrayList<>();
+                if (itemBuffer == null) {
+                    return result;
+                }
+
+                if (itemBuffer.remaining() % COMPACT_INDEX_ITEM_SIZE != 0) {
+                    log.error("IndexStoreFile query from tiered storage return 
error item buffer, " +
+                        "key: {}, maxCount: {}, timestamp={}-{}", key, 
maxCount, beginTime, endTime);
+                    return result;
+                }
+
+                int size = itemBuffer.remaining() / COMPACT_INDEX_ITEM_SIZE;
+                byte[] bytes = new byte[COMPACT_INDEX_ITEM_SIZE];
+                for (int i = 0; i < size; i++) {
+                    itemBuffer.get(bytes);
+                    IndexItem indexItem = new IndexItem(bytes);
+                    long storeTimestamp = indexItem.getTimeDiff() + 
beginTimestamp.get();
+                    if (hashCode == indexItem.getHashCode() &&
+                        beginTime <= storeTimestamp && storeTimestamp <= 
endTime &&
+                        result.size() < maxCount) {
+                        result.add(indexItem);
+                    }
+                }
+                return result;
+            });
+
+        return future.whenComplete((result, throwable) -> {
+            long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+            if (throwable != null) {
+                log.error("IndexStoreFile query from segment file, cost: {}ms, 
timestamp: {}, " +
+                        "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+                    costTime, getTimestamp(), key, hashCode, maxCount, 
beginTime, endTime, throwable);
+            } else {
+                String details = Optional.ofNullable(result)
+                    .map(r -> r.stream()
+                        .map(item -> String.format("%d-%d", item.getQueueId(), 
item.getOffset()))
+                        .collect(Collectors.joining(", ")))
+                    .orElse("");
+
+                log.debug("IndexStoreFile query from segment file, cost: {}ms, 
timestamp: {}, result size: {}, ({}), " +
+                        "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+                    costTime, getTimestamp(), result != null ? result.size() : 
0, details, key, hashCode, maxCount, beginTime, endTime);
+            }
+        });
+    }
+
+    @Override
+    public ByteBuffer doCompaction() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        ByteBuffer buffer;
+        try {
+            buffer = compactToNewFile();
+            log.debug("IndexStoreFile do compaction, timestamp: {}, file size: 
{}, cost: {}ms",
+                this.getTimestamp(), buffer.capacity(), 
stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        } catch (Exception e) {
+            log.error("IndexStoreFile do compaction, timestamp: {}, cost: 
{}ms",
+                this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), 
e);
+            return null;
+        }
+
+        try {
+            // Make sure there is no read request here
+            fileReadWriteLock.writeLock().lock();
+            fileStatus.set(IndexStatusEnum.SEALED);
+        } catch (Exception e) {
+            log.error("IndexStoreFile change file status to sealed error, 
timestamp={}", this.getTimestamp());
+        } finally {
+            fileReadWriteLock.writeLock().unlock();
+        }
+        return buffer;
+    }
+
+    protected String getCompactedFilePath() {
+        return Paths.get(this.mappedFile.getFileName()).getParent()
+            .resolve(FILE_COMPACTED_DIRECTORY_NAME)
+            .resolve(String.valueOf(this.getTimestamp())).toString();
+    }
+
+    protected ByteBuffer compactToNewFile() throws IOException {
+
+        byte[] payload = new byte[IndexItem.INDEX_ITEM_SIZE];
+        ByteBuffer payloadBuffer = ByteBuffer.wrap(payload);
+        int writePosition = INDEX_HEADER_SIZE + (hashSlotMaxCount * 
HASH_SLOT_SIZE);
+        int fileMaxLength = writePosition + COMPACT_INDEX_ITEM_SIZE * 
indexItemCount.get();
+
+        compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), 
fileMaxLength);
+        MappedByteBuffer newBuffer = compactMappedFile.getMappedByteBuffer();
+
+        for (int i = 0; i < hashSlotMaxCount; i++) {
+            int slotPosition = this.getSlotPosition(i);
+            int slotValue = this.getSlotValue(slotPosition);
+            int writeBeginPosition = writePosition;
+
+            while (slotValue > INVALID_INDEX && writePosition < fileMaxLength) 
{
+                ByteBuffer buffer = this.byteBuffer.duplicate();
+                buffer.position(this.getItemPosition(slotValue));
+                buffer.get(payload);
+                int newSlotValue = 
payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
+                buffer.limit(COMPACT_INDEX_ITEM_SIZE);
+                newBuffer.position(writePosition);
+                newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
+                log.trace("IndexStoreFile do compaction, write item, slot: {}, 
current: {}, next: {}", i, slotValue, newSlotValue);
+                slotValue = newSlotValue;
+                writePosition += COMPACT_INDEX_ITEM_SIZE;
+            }
+
+            int length = writePosition - writeBeginPosition;
+            newBuffer.putInt(slotPosition, writeBeginPosition);
+            newBuffer.putInt(slotPosition + Integer.BYTES, length);
+
+            if (length > 0) {
+                log.trace("IndexStoreFile do compaction, write slot, slot: {}, 
begin: {}, length: {}", i, writeBeginPosition, length);
+            }
+        }
+
+        this.flushNewMetadata(newBuffer, true);
+        newBuffer.flip();
+        return newBuffer;
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            fileReadWriteLock.writeLock().lock();
+            this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
+            if (this.mappedFile != null) {
+                this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+                this.mappedFile = null;
+            }
+            if (this.compactMappedFile != null) {
+                this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+                this.compactMappedFile = null;
+            }
+        } catch (Exception e) {
+            log.error("IndexStoreFile shutdown failed, timestamp: {}, status: 
{}", this.getTimestamp(), fileStatus.get(), e);
+        } finally {
+            fileReadWriteLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void destroy() {
+        try {
+            fileReadWriteLock.writeLock().lock();
+            this.shutdown();
+            switch (this.fileStatus.get()) {
+                case SHUTDOWN:
+                case UNSEALED:
+                case SEALED:
+                    if (this.mappedFile != null) {
+                        this.mappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
+                    }
+                    if (this.compactMappedFile != null) {
+                        
this.compactMappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
+                    }
+                    log.info("IndexStoreService destroy local file, timestamp: 
{}, status: {}", this.getTimestamp(), fileStatus.get());
+                    break;
+                case UPLOAD:
+                    log.warn("[BUG] IndexStoreService destroy remote file, 
timestamp: {}", this.getTimestamp());
+            }
+        } catch (Exception e) {
+            log.error("IndexStoreService destroy failed, timestamp: {}, 
status: {}", this.getTimestamp(), fileStatus.get(), e);
+        } finally {
+            fileReadWriteLock.writeLock().unlock();
+        }
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
new file mode 100644
index 0000000000..14608aa58d
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
+import org.apache.rocketmq.tieredstore.file.TieredFlatFile;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+public class IndexStoreService extends ServiceThread implements IndexService {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    public static final String FILE_DIRECTORY_NAME = "tiered_index_file";
+    public static final String FILE_COMPACTED_DIRECTORY_NAME = "compacting";
+
+    /**
+     * File status in table example:
+     * upload, upload, upload, sealed, sealed, unsealed
+     */
+    private final TieredMessageStoreConfig storeConfig;
+    private final ConcurrentSkipListMap<Long /* timestamp */, IndexFile> 
timeStoreTable;
+    private final ReadWriteLock readWriteLock;
+    private final AtomicLong compactTimestamp;
+    private final String filePath;
+    private final TieredFileAllocator fileAllocator;
+
+    private IndexFile currentWriteFile;
+    private TieredFlatFile flatFile;
+
+    public IndexStoreService(TieredFileAllocator fileAllocator, String 
filePath) {
+        this.storeConfig = fileAllocator.getStoreConfig();
+        this.filePath = filePath;
+        this.fileAllocator = fileAllocator;
+        this.timeStoreTable = new ConcurrentSkipListMap<>();
+        this.compactTimestamp = new AtomicLong(0L);
+        this.readWriteLock = new ReentrantReadWriteLock();
+        this.recover();
+    }
+
+    private void doConvertOldFormatFile(String filePath) {
+        try {
+            File file = new File(filePath);
+            if (!file.exists()) {
+                return;
+            }
+            MappedFile mappedFile = new DefaultMappedFile(file.getPath(), 
(int) file.length());
+            long timestamp = 
mappedFile.getMappedByteBuffer().getLong(IndexStoreFile.INDEX_BEGIN_TIME_STAMP);
+            if (timestamp <= 0) {
+                mappedFile.destroy(TimeUnit.SECONDS.toMillis(10));
+            } else {
+                mappedFile.renameTo(String.valueOf(new File(file.getParent(), 
String.valueOf(timestamp))));
+                mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+            }
+        } catch (Exception e) {
+            log.error("IndexStoreService do convert old format error, file: 
{}", filePath, e);
+        }
+    }
+
+    private void recover() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        // recover local
+        File dir = new File(Paths.get(storeConfig.getStorePathRootDir(), 
FILE_DIRECTORY_NAME).toString());
+        this.doConvertOldFormatFile(Paths.get(dir.getPath(), 
"0000").toString());
+        this.doConvertOldFormatFile(Paths.get(dir.getPath(), 
"1111").toString());
+        File[] files = dir.listFiles();
+
+        if (files != null) {
+            List<File> fileList = Arrays.asList(files);
+            fileList.sort(Comparator.comparing(File::getName));
+
+            for (File file : fileList) {
+                if (file.isDirectory() || 
!StringUtils.isNumeric(file.getName())) {
+                    continue;
+                }
+
+                try {
+                    IndexFile indexFile = new IndexStoreFile(storeConfig, 
Long.parseLong(file.getName()));
+                    timeStoreTable.put(indexFile.getTimestamp(), indexFile);
+                    log.info("IndexStoreService recover load local file, 
timestamp: {}", indexFile.getTimestamp());
+                } catch (Exception e) {
+                    log.error("IndexStoreService recover, load local file 
error", e);
+                }
+            }
+        }
+
+        if (this.timeStoreTable.isEmpty()) {
+            this.createNewIndexFile(System.currentTimeMillis());
+        }
+
+        this.currentWriteFile = this.timeStoreTable.lastEntry().getValue();
+        this.setCompactTimestamp(this.timeStoreTable.firstKey() - 1);
+
+        // recover remote
+        this.flatFile = fileAllocator.createFlatFileForIndexFile(filePath);
+        if (this.flatFile.getBaseOffset() == -1) {
+            this.flatFile.setBaseOffset(0);
+        }
+
+        for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
+            IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
+            timeStoreTable.put(indexFile.getTimestamp(), indexFile);
+            log.info("IndexStoreService recover load remote file, timestamp: 
{}", indexFile.getTimestamp());
+        }
+
+        log.info("IndexStoreService recover finished, entrySize: {}, cost: 
{}ms, directory: {}",
+            timeStoreTable.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS), 
dir.getAbsolutePath());
+    }
+
+    public void createNewIndexFile(long timestamp) {
+        try {
+            this.readWriteLock.writeLock().lock();
+            IndexFile indexFile = this.currentWriteFile;
+            if (this.timeStoreTable.containsKey(timestamp) ||
+                indexFile != null && 
IndexFile.IndexStatusEnum.UNSEALED.equals(indexFile.getFileStatus())) {
+                return;
+            }
+            IndexStoreFile newStoreFile = new IndexStoreFile(storeConfig, 
timestamp);
+            this.timeStoreTable.put(timestamp, newStoreFile);
+            this.currentWriteFile = newStoreFile;
+            log.info("IndexStoreService construct next file, timestamp: {}", 
timestamp);
+        } catch (Exception e) {
+            log.error("IndexStoreService construct next file, timestamp: {}", 
timestamp, e);
+        } finally {
+            this.readWriteLock.writeLock().unlock();
+        }
+    }
+
+    @VisibleForTesting
+    public ConcurrentSkipListMap<Long, IndexFile> getTimeStoreTable() {
+        return timeStoreTable;
+    }
+
+    @Override
+    public AppendResult putKey(
+        String topic, int topicId, int queueId, Set<String> keySet, long 
offset, int size, long timestamp) {
+
+        if (StringUtils.isBlank(topic)) {
+            return AppendResult.UNKNOWN_ERROR;
+        }
+
+        if (keySet == null || keySet.isEmpty()) {
+            return AppendResult.SUCCESS;
+        }
+
+        for (int i = 0; i < 3; i++) {
+            AppendResult result = this.currentWriteFile.putKey(
+                topic, topicId, queueId, keySet, offset, size, timestamp);
+
+            if (AppendResult.SUCCESS.equals(result)) {
+                return AppendResult.SUCCESS;
+            } else if (AppendResult.FILE_FULL.equals(result)) {
+                this.createNewIndexFile(timestamp);
+            }
+        }
+
+        log.error("IndexStoreService put key three times return error, topic: 
{}, topicId: {}, " +
+            "queueId: {}, keySize: {}, timestamp: {}", topic, topicId, 
queueId, keySet.size(), timestamp);
+        return AppendResult.UNKNOWN_ERROR;
+    }
+
+    @Override
+    public CompletableFuture<List<IndexItem>> queryAsync(
+        String topic, String key, int maxCount, long beginTime, long endTime) {
+
+        CompletableFuture<List<IndexItem>> future = new CompletableFuture<>();
+        try {
+            readWriteLock.readLock().lock();
+            ConcurrentNavigableMap<Long, IndexFile> pendingMap =
+                this.timeStoreTable.subMap(beginTime, true, endTime, true);
+            List<CompletableFuture<Void>> futureList = new 
ArrayList<>(pendingMap.size());
+            ConcurrentHashMap<String /* queueId-offset */, IndexItem> result = 
new ConcurrentHashMap<>();
+
+            for (Map.Entry<Long, IndexFile> entry : 
pendingMap.descendingMap().entrySet()) {
+                CompletableFuture<Void> completableFuture = entry.getValue()
+                    .queryAsync(topic, key, maxCount, beginTime, endTime)
+                    .thenAccept(itemList -> itemList.forEach(indexItem -> {
+                        if (result.size() < maxCount) {
+                            result.put(String.format(
+                                "%d-%d", indexItem.getQueueId(), 
indexItem.getOffset()), indexItem);
+                        }
+                    }));
+                futureList.add(completableFuture);
+            }
+
+            CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0]))
+                .whenComplete((v, t) -> {
+                    // Try to return the query results as much as possible here
+                    // rather than directly throwing exceptions
+                    if (result.isEmpty() && t != null) {
+                        future.completeExceptionally(t);
+                    } else {
+                        List<IndexItem> resultList = new 
ArrayList<>(result.values());
+                        future.complete(resultList.subList(0, 
Math.min(resultList.size(), maxCount)));
+                    }
+                });
+        } catch (Exception e) {
+            future.completeExceptionally(e);
+        } finally {
+            readWriteLock.readLock().unlock();
+        }
+        return future;
+    }
+
+    public void doCompactThenUploadFile(IndexFile indexFile) {
+        if 
(IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
+            log.error("IndexStoreService file status not correct, so skip, 
timestamp: {}, status: {}",
+                indexFile.getTimestamp(), indexFile.getFileStatus());
+            return;
+        }
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        ByteBuffer byteBuffer = indexFile.doCompaction();
+        if (byteBuffer == null) {
+            log.error("IndexStoreService found compaction buffer is null, 
timestamp: {}", indexFile.getTimestamp());
+            return;
+        }
+        flatFile.append(byteBuffer);
+        flatFile.commit(true);
+
+        TieredFileSegment fileSegment = 
flatFile.getFileByIndex(flatFile.getFileSegmentCount() - 1);
+        if (fileSegment == null || fileSegment.getMinTimestamp() != 
indexFile.getTimestamp()) {
+            log.warn("IndexStoreService submit compacted file to server 
failed, timestamp: {}", indexFile.getTimestamp());
+            return;
+        }
+
+        try {
+            readWriteLock.writeLock().lock();
+            IndexFile storeFile = new IndexStoreFile(storeConfig, fileSegment);
+            timeStoreTable.put(indexFile.getTimestamp(), storeFile);
+            indexFile.destroy();
+        } catch (Exception e) {
+            log.error("IndexStoreService switch file failed, timestamp: {}, 
cost: {}ms",
+                indexFile.getTimestamp(), 
stopwatch.elapsed(TimeUnit.MILLISECONDS), e);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    public void destroyExpiredFile(long expireTimestamp) {
+        flatFile.cleanExpiredFile(expireTimestamp);
+        flatFile.destroyExpiredFile();
+    }
+
+    public void destroy() {
+        try {
+            readWriteLock.writeLock().lock();
+
+            // delete local store file
+            for (Map.Entry<Long, IndexFile> entry : timeStoreTable.entrySet()) 
{
+                IndexFile indexFile = entry.getValue();
+                if 
(IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
+                    continue;
+                }
+                indexFile.destroy();
+            }
+
+            // delete remote
+            if (flatFile != null) {
+                flatFile.destroy();
+            }
+        } catch (Exception e) {
+            log.error("IndexStoreService destroy all file error", e);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public String getServiceName() {
+        return IndexStoreService.class.getSimpleName();
+    }
+
+    public void setCompactTimestamp(long timestamp) {
+        this.compactTimestamp.set(timestamp);
+        log.info("IndexStoreService compact timestamp has been set to: {}", 
timestamp);
+    }
+
+    protected IndexFile getNextSealedFile() {
+        try {
+            Map.Entry<Long, IndexFile> entry =
+                this.timeStoreTable.higherEntry(this.compactTimestamp.get());
+            if (entry != null && entry.getKey() < 
this.timeStoreTable.lastKey()) {
+                return entry.getValue();
+            }
+        } catch (Throwable e) {
+            log.error("Error occurred in " + getServiceName(), e);
+        }
+        return null;
+    }
+
+    @Override
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+        while (!this.isStopped()) {
+            long expireTimestamp = System.currentTimeMillis()
+                - 
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
+            this.destroyExpiredFile(expireTimestamp);
+
+            IndexFile indexFile = this.getNextSealedFile();
+            if (indexFile == null) {
+                this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
+                continue;
+            }
+            this.doCompactThenUploadFile(indexFile);
+            this.setCompactTimestamp(indexFile.getTimestamp());
+        }
+        log.info(this.getServiceName() + " service shutdown");
+    }
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        for (Map.Entry<Long /* timestamp */, IndexFile> entry : 
timeStoreTable.entrySet()) {
+            entry.getValue().shutdown();
+        }
+        this.timeStoreTable.clear();
+        log.info("IndexStoreService shutdown gracefully");
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 32911a6e89..aad42de98d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -31,12 +31,14 @@ import 
org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
 import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
 import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
 import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
 import 
org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
+import static 
org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_BEGIN_TIME_STAMP;
+import static 
org.apache.rocketmq.tieredstore.index.IndexStoreFile.INDEX_END_TIME_STAMP;
+
 public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>, TieredStoreProvider {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
@@ -198,8 +200,9 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
             }
 
             if (fileType == FileSegmentType.INDEX) {
-                minTimestamp = 
byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
-                maxTimestamp = 
byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
+                minTimestamp = byteBuf.getLong(INDEX_BEGIN_TIME_STAMP);
+                maxTimestamp = byteBuf.getLong(INDEX_END_TIME_STAMP);
+
                 appendPosition += byteBuf.remaining();
                 // IndexFile is large and not change after compaction, no need 
deep copy
                 bufferList.add(byteBuf);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 0db3eaf8f4..b9938b7a8a 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -59,7 +59,7 @@ public interface TieredStoreProvider {
      * Get data from backend file system
      *
      * @param position the index from where the file will be read
-     * @param length the data size will be read
+     * @param length   the data size will be read
      * @return data to be read
      */
     CompletableFuture<ByteBuffer> read0(long position, int length);
@@ -68,10 +68,10 @@ public interface TieredStoreProvider {
      * Put data to backend file system
      *
      * @param inputStream data stream
-     * @param position backend file position to put, used in append mode
-     * @param length data size in stream
-     * @param append try to append or create a new file
+     * @param position    backend file position to put, used in append mode
+     * @param length      data size in stream
+     * @param append      try to append or create a new file
      * @return put result, <code>true</code> if data successfully write; 
<code>false</code> otherwise
      */
-    CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream,long 
position, int length, boolean append);
+    CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream, 
long position, int length, boolean append);
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 7e949cb28c..ee56b1e68b 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -159,6 +159,7 @@ public class PosixFileSegment extends TieredFileSegment {
             readFileChannel.position(position);
             readFileChannel.read(byteBuffer);
             byteBuffer.flip();
+            byteBuffer.limit(length);
 
             attributesBuilder.put(LABEL_SUCCESS, true);
             long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 774c6cf646..4e0d7e6979 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -37,7 +37,6 @@ import 
org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
 import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
 import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
-import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -83,6 +82,7 @@ public class TieredMessageFetcherTest {
         Assert.assertEquals(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 
getMessageResult.getStatus());
 
         CompositeFlatFile flatFile = 
flatFileManager.getOrCreateFlatFileIfAbsent(mq);
+        Assert.assertNotNull(flatFile);
         flatFile.initOffset(0);
 
         getMessageResult = fetcher.getMessageAsync("group", mq.getTopic(), 
mq.getQueueId(), 0, 32, null).join();
@@ -197,6 +197,7 @@ public class TieredMessageFetcherTest {
     public void testGetMessageStoreTimeStampAsync() {
         TieredMessageFetcher fetcher = new TieredMessageFetcher(storeConfig);
         CompositeFlatFile flatFile = 
TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq);
+        Assert.assertNotNull(flatFile);
         flatFile.initOffset(0);
 
         ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
@@ -270,6 +271,7 @@ public class TieredMessageFetcherTest {
         CompositeQueueFlatFile flatFile = 
TieredFlatFileManager.getInstance(storeConfig).getOrCreateFlatFileIfAbsent(mq);
         Assert.assertEquals(0, fetcher.queryMessageAsync(mq.getTopic(), "key", 
32, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
 
+        Assert.assertNotNull(flatFile);
         flatFile.initOffset(0);
         ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
@@ -281,20 +283,19 @@ public class TieredMessageFetcherTest {
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 2);
         flatFile.appendCommitLog(buffer);
 
-        DispatchRequest request = new DispatchRequest(mq.getTopic(), 
mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "key", 0, 0, 
null);
+        long timestamp = System.currentTimeMillis();
+        DispatchRequest request = new DispatchRequest(mq.getTopic(), 
mq.getQueueId(), 0, MessageBufferUtilTest.MSG_LEN, 0, timestamp, 0, "", "key", 
0, 0, null);
         flatFile.appendIndexFile(request);
-        request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 
MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", 
"key", 0, 0, null);
+        request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 
MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN, 0, timestamp + 1, 
0, "", "key", 0, 0, null);
         flatFile.appendIndexFile(request);
-        request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 
MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", 
"another-key", 0, 0, null);
+        request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 
MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, timestamp 
+ 2, 0, "", "another-key", 0, 0, null);
         flatFile.appendIndexFile(request);
         flatFile.commit(true);
-        TieredIndexFile indexFile = 
TieredFlatFileManager.getIndexFile(storeConfig);
-        indexFile.commit(true);
         Assert.assertEquals(1, fetcher.queryMessageAsync(mq.getTopic(), "key", 
1, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
 
         QueryMessageResult result = fetcher.queryMessageAsync(mq.getTopic(), 
"key", 32, 0, Long.MAX_VALUE).join();
         Assert.assertEquals(2, result.getMessageMapedList().size());
-        Assert.assertEquals(1, 
result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
-        Assert.assertEquals(0, 
result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
+        Assert.assertEquals(0, 
result.getMessageMapedList().get(0).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
+        Assert.assertEquals(1, 
result.getMessageMapedList().get(1).getByteBuffer().getLong(MessageBufferUtil.QUEUE_OFFSET_POSITION));
     }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
deleted file mode 100644
index 2da72bc7a7..0000000000
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tieredstore.file;
-
-import com.sun.jna.Platform;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.util.List;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TieredIndexFileTest {
-
-    private final String storePath = TieredStoreTestUtil.getRandomStorePath();
-    private MessageQueue mq;
-    private TieredMessageStoreConfig storeConfig;
-
-    @Before
-    public void setUp() {
-        storeConfig = new TieredMessageStoreConfig();
-        storeConfig.setBrokerName("IndexFileBroker");
-        storeConfig.setStorePathRootDir(storePath);
-        
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
-        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
-        storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
-        mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1);
-        TieredStoreUtil.getMetadataStore(storeConfig);
-        TieredStoreExecutor.init();
-    }
-
-    @After
-    public void tearDown() throws IOException {
-        TieredStoreTestUtil.destroyMetadataStore();
-        TieredStoreTestUtil.destroyTempDir(storePath);
-        TieredStoreExecutor.shutdown();
-    }
-
-    @Test
-    public void testAppendAndQuery() throws IOException, 
ClassNotFoundException, NoSuchMethodException {
-        if (Platform.isWindows()) {
-            return;
-        }
-
-        TieredFileAllocator fileQueueFactory = new 
TieredFileAllocator(storeConfig);
-        TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory, 
storePath);
-
-        indexFile.append(mq, 0, "key3", 3, 300, 1000);
-        indexFile.append(mq, 0, "key2", 2, 200, 1100);
-        indexFile.append(mq, 0, "key1", 1, 100, 1200);
-
-        // do not do schedule task here
-        TieredStoreExecutor.shutdown();
-        List<Pair<Long, ByteBuffer>> indexList =
-            indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
-        Assert.assertEquals(0, indexList.size());
-
-        // do compaction once
-        TieredStoreExecutor.init();
-        storeConfig.setTieredStoreIndexFileRollingIdleInterval(0);
-        indexFile.doScheduleTask();
-        Awaitility.await().atMost(Duration.ofSeconds(10))
-            .until(() -> !indexFile.getPreMappedFile().getFile().exists());
-
-        indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 
1200).join();
-        Assert.assertEquals(1, indexList.size());
-
-        indexFile.destroy();
-    }
-}
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
new file mode 100644
index 0000000000..22ed4cc180
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexItemTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import java.nio.ByteBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IndexItemTest {
+
+    private final int topicId = 1;
+    private final int queueId = 2;
+    private final long offset = 3L;
+    private final int size = 4;
+    private final int hashCode = 5;
+    private final int timeDiff = 6;
+    private final int itemIndex = 7;
+
+    @Test
+    public void indexItemConstructorTest() {
+        IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, 
hashCode, timeDiff, itemIndex);
+
+        Assert.assertEquals(topicId, indexItem.getTopicId());
+        Assert.assertEquals(queueId, indexItem.getQueueId());
+        Assert.assertEquals(offset, indexItem.getOffset());
+        Assert.assertEquals(size, indexItem.getSize());
+        Assert.assertEquals(hashCode, indexItem.getHashCode());
+        Assert.assertEquals(timeDiff, indexItem.getTimeDiff());
+        Assert.assertEquals(itemIndex, indexItem.getItemIndex());
+    }
+
+    @Test
+    public void byteBufferConstructorTest() {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(IndexItem.INDEX_ITEM_SIZE);
+        byteBuffer.putInt(hashCode);
+        byteBuffer.putInt(topicId);
+        byteBuffer.putInt(queueId);
+        byteBuffer.putLong(offset);
+        byteBuffer.putInt(size);
+        byteBuffer.putInt(timeDiff);
+        byteBuffer.putInt(itemIndex);
+
+        byte[] bytes = byteBuffer.array();
+        IndexItem indexItem = new IndexItem(bytes);
+
+        Assert.assertEquals(topicId, indexItem.getTopicId());
+        Assert.assertEquals(queueId, indexItem.getQueueId());
+        Assert.assertEquals(offset, indexItem.getOffset());
+        Assert.assertEquals(size, indexItem.getSize());
+        Assert.assertEquals(hashCode, indexItem.getHashCode());
+        Assert.assertEquals(timeDiff, indexItem.getTimeDiff());
+        Assert.assertEquals(itemIndex, indexItem.getItemIndex());
+        Assert.assertNotNull(indexItem.toString());
+
+        Exception exception = null;
+        try {
+            new IndexItem(null);
+        } catch (Exception e) {
+            exception = e;
+        }
+        Assert.assertNotNull(exception);
+    }
+
+    @Test
+    public void getByteBufferTest() {
+        IndexItem indexItem = new IndexItem(topicId, queueId, offset, size, 
hashCode, timeDiff, itemIndex);
+        ByteBuffer byteBuffer = indexItem.getByteBuffer();
+        Assert.assertEquals(hashCode, byteBuffer.getInt(0));
+        Assert.assertEquals(topicId, byteBuffer.getInt(4));
+        Assert.assertEquals(queueId, byteBuffer.getInt(8));
+        Assert.assertEquals(offset, byteBuffer.getLong(12));
+        Assert.assertEquals(size, byteBuffer.getInt(20));
+        Assert.assertEquals(timeDiff, byteBuffer.getInt(24));
+        Assert.assertEquals(itemIndex, byteBuffer.getInt(28));
+    }
+}
\ No newline at end of file
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
new file mode 100644
index 0000000000..b408a7c3cf
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.FileSegmentType;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexStoreFileTest {
+
+    private static final String TOPIC_NAME = "TopicTest";
+    private static final int TOPIC_ID = 123;
+    private static final int QUEUE_ID = 2;
+    private static final long MESSAGE_OFFSET = 666L;
+    private static final int MESSAGE_SIZE = 1024;
+    private static final String KEY = "MessageKey";
+    private static final Set<String> KEY_SET = Collections.singleton(KEY);
+
+    private String filePath;
+    private TieredMessageStoreConfig storeConfig;
+    private IndexStoreFile indexStoreFile;
+
+    @Before
+    public void init() throws IOException {
+        TieredStoreExecutor.init();
+        filePath = UUID.randomUUID().toString().replace("-", "").substring(0, 
8);
+        String directory = Paths.get(System.getProperty("user.home"), 
"store_test", filePath).toString();
+        storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setStorePathRootDir(directory);
+        storeConfig.setTieredStoreFilePath(directory);
+        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+        storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+        
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+        indexStoreFile = new IndexStoreFile(storeConfig, 
System.currentTimeMillis());
+    }
+
+    @After
+    public void shutdown() {
+        if (this.indexStoreFile != null) {
+            this.indexStoreFile.shutdown();
+            this.indexStoreFile.destroy();
+        }
+        TieredStoreTestUtil.destroyMetadataStore();
+        TieredStoreTestUtil.destroyTempDir(storeConfig.getStorePathRootDir());
+        
TieredStoreTestUtil.destroyTempDir(storeConfig.getTieredStoreFilePath());
+        TieredStoreExecutor.shutdown();
+    }
+
+    @Test
+    public void testIndexHeaderConstants() {
+        Assert.assertEquals(0, IndexStoreFile.INDEX_MAGIC_CODE);
+        Assert.assertEquals(4, IndexStoreFile.INDEX_BEGIN_TIME_STAMP);
+        Assert.assertEquals(12, IndexStoreFile.INDEX_END_TIME_STAMP);
+        Assert.assertEquals(20, IndexStoreFile.INDEX_SLOT_COUNT);
+        Assert.assertEquals(24, IndexStoreFile.INDEX_ITEM_INDEX);
+        Assert.assertEquals(28, IndexStoreFile.INDEX_HEADER_SIZE);
+        Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 4, 
IndexStoreFile.BEGIN_MAGIC_CODE);
+        Assert.assertEquals(0xCCDDEEFF ^ 1880681586 + 8, 
IndexStoreFile.END_MAGIC_CODE);
+    }
+
+    @Test
+    public void basicMethodTest() throws IOException {
+        long timestamp = System.currentTimeMillis();
+        IndexStoreFile localFile = new IndexStoreFile(storeConfig, timestamp);
+        Assert.assertEquals(timestamp, localFile.getTimestamp());
+
+        // test file status
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, 
localFile.getFileStatus());
+        localFile.doCompaction();
+        Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, 
localFile.getFileStatus());
+
+        // test hash
+        Assert.assertEquals("TopicTest#MessageKey", 
localFile.buildKey(TOPIC_NAME, KEY));
+        Assert.assertEquals(638347386, 
indexStoreFile.hashCode(localFile.buildKey(TOPIC_NAME, KEY)));
+
+        // test calculate position
+        long headerSize = IndexStoreFile.INDEX_HEADER_SIZE;
+        Assert.assertEquals(headerSize + Long.BYTES * 2, 
indexStoreFile.getSlotPosition(2));
+        Assert.assertEquals(headerSize + Long.BYTES * 5, 
indexStoreFile.getSlotPosition(5));
+        Assert.assertEquals(headerSize + Long.BYTES * 5 + 
IndexItem.INDEX_ITEM_SIZE * 2,
+            indexStoreFile.getItemPosition(2));
+        Assert.assertEquals(headerSize + Long.BYTES * 5 + 
IndexItem.INDEX_ITEM_SIZE * 5,
+            indexStoreFile.getItemPosition(5));
+    }
+
+    @Test
+    public void basicPutGetTest() {
+        long timestamp = indexStoreFile.getTimestamp();
+
+        // check metadata
+        Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+        Assert.assertEquals(0, indexStoreFile.getEndTimestamp());
+        Assert.assertEquals(0, indexStoreFile.getIndexItemCount());
+        Assert.assertEquals(0, indexStoreFile.getHashSlotCount());
+
+        // not put success
+        Assert.assertEquals(AppendResult.UNKNOWN_ERROR, indexStoreFile.putKey(
+            null, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, 
timestamp));
+        Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, null, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+        Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.emptySet(), 
MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+
+        // first item is invalid
+        for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum() - 
2; i++) {
+            Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+                TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+            Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+            Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
+            Assert.assertEquals(1, indexStoreFile.getHashSlotCount());
+            Assert.assertEquals(i + 1, indexStoreFile.getIndexItemCount());
+        }
+
+        Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+        Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+
+        Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+        Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
+        Assert.assertEquals(1, indexStoreFile.getHashSlotCount());
+        Assert.assertEquals(storeConfig.getTieredStoreIndexFileMaxIndexNum() - 
1, indexStoreFile.getIndexItemCount());
+    }
+
+    @Test
+    public void differentKeyPutTest() {
+        long timestamp = indexStoreFile.getTimestamp();
+        for (int i = 0; i < 5; i++) {
+            for (int j = 0; j < 3; j++) {
+                Assert.assertEquals(AppendResult.SUCCESS, 
indexStoreFile.putKey(
+                    TOPIC_NAME + i, TOPIC_ID, QUEUE_ID, KEY_SET, 
MESSAGE_OFFSET, MESSAGE_SIZE, timestamp));
+            }
+        }
+        Assert.assertEquals(timestamp, indexStoreFile.getTimestamp());
+        Assert.assertEquals(timestamp, indexStoreFile.getEndTimestamp());
+        Assert.assertEquals(5, indexStoreFile.getHashSlotCount());
+        Assert.assertEquals(5 * 3, indexStoreFile.getIndexItemCount());
+    }
+
+    @Test
+    public void concurrentPutTest() throws InterruptedException {
+        long timestamp = indexStoreFile.getTimestamp();
+
+        ExecutorService executorService = Executors.newFixedThreadPool(
+            4, new ThreadFactoryImpl("ConcurrentPutGetTest"));
+
+        // first item is invalid
+        int indexCount = storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1;
+        CountDownLatch latch = new CountDownLatch(indexCount);
+        for (int i = 0; i < indexCount; i++) {
+            executorService.submit(() -> {
+                Assert.assertEquals(AppendResult.SUCCESS, 
indexStoreFile.putKey(
+                    TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignored) {
+                }
+                latch.countDown();
+            });
+        }
+        latch.await();
+
+        executorService.shutdown();
+        Assert.assertEquals(AppendResult.FILE_FULL, indexStoreFile.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+        Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
+    }
+
+    @Test
+    public void recoverFileTest() throws IOException {
+        int indexCount = 10;
+        long timestamp = indexStoreFile.getTimestamp();
+        for (int i = 0; i < indexCount; i++) {
+            Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+                TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+        }
+        indexStoreFile.shutdown();
+        Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
+        indexStoreFile = new IndexStoreFile(storeConfig, timestamp);
+        Assert.assertEquals(indexCount, indexStoreFile.getIndexItemCount());
+    }
+
+    @Test
+    public void doCompactionTest() throws Exception {
+        long timestamp = indexStoreFile.getTimestamp();
+        for (int i = 0; i < 10; i++) {
+            Assert.assertEquals(AppendResult.SUCCESS, indexStoreFile.putKey(
+                TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+        }
+
+        ByteBuffer byteBuffer = indexStoreFile.doCompaction();
+        TieredFileSegment fileSegment = new PosixFileSegment(
+            storeConfig, FileSegmentType.INDEX, filePath, 0L);
+        fileSegment.append(byteBuffer, timestamp);
+        fileSegment.commit();
+        Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
+        fileSegment.destroyFile();
+    }
+
+    @Test
+    public void queryAsyncFromUnsealedFileTest() throws Exception {
+        long timestamp = indexStoreFile.getTimestamp();
+        for (int i = 0; i < 5; i++) {
+            for (int j = 0; j < 3; j++) {
+                Assert.assertEquals(AppendResult.SUCCESS, 
indexStoreFile.putKey(TOPIC_NAME + i,
+                    TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, 
System.currentTimeMillis()));
+            }
+        }
+        List<IndexItem> itemList = indexStoreFile.queryAsync(
+            TOPIC_NAME + "1", KEY, 64, timestamp, 
System.currentTimeMillis()).get();
+        Assert.assertEquals(3, itemList.size());
+    }
+
+    @Test
+    public void queryAsyncFromSegmentFileTest() throws ExecutionException, 
InterruptedException {
+        long timestamp = indexStoreFile.getTimestamp();
+        for (int i = 0; i < 5; i++) {
+            for (int j = 0; j < 3; j++) {
+                Assert.assertEquals(AppendResult.SUCCESS, 
indexStoreFile.putKey(TOPIC_NAME + i,
+                    TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, MESSAGE_SIZE, 
System.currentTimeMillis()));
+            }
+        }
+
+        ByteBuffer byteBuffer = indexStoreFile.doCompaction();
+        TieredFileSegment fileSegment = new PosixFileSegment(
+            storeConfig, FileSegmentType.INDEX, filePath, 0L);
+        fileSegment.append(byteBuffer, timestamp);
+        fileSegment.commit();
+        Assert.assertEquals(byteBuffer.limit(), fileSegment.getSize());
+        indexStoreFile.destroy();
+
+        indexStoreFile = new IndexStoreFile(storeConfig, fileSegment);
+
+        // change topic
+        List<IndexItem> itemList = indexStoreFile.queryAsync(
+            TOPIC_NAME, KEY, 64, timestamp, System.currentTimeMillis()).get();
+        Assert.assertEquals(0, itemList.size());
+
+        // change key
+        itemList = indexStoreFile.queryAsync(
+            TOPIC_NAME, KEY + "1", 64, timestamp, 
System.currentTimeMillis()).get();
+        Assert.assertEquals(0, itemList.size());
+
+        itemList = indexStoreFile.queryAsync(
+            TOPIC_NAME + "1", KEY, 64, timestamp, 
System.currentTimeMillis()).get();
+        Assert.assertEquals(3, itemList.size());
+    }
+}
\ No newline at end of file
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
new file mode 100644
index 0000000000..57d00eefe1
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.index;
+
+import com.google.common.base.Stopwatch;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.results.format.ResultFormatType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@Ignore
+@State(Scope.Benchmark)
+@Fork(value = 1, jvmArgs = {"-Djava.net.preferIPv4Stack=true", 
"-Djmh.rmi.port=1099"})
+public class IndexStoreServiceBenchTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+    private static final String TOPIC_NAME = "TopicTest";
+    private TieredMessageStoreConfig storeConfig;
+    private IndexStoreService indexStoreService;
+    private final LongAdder failureCount = new LongAdder();
+
+    @Setup
+    public void init() throws ClassNotFoundException, NoSuchMethodException {
+        String storePath = Paths.get(System.getProperty("user.home"), 
"store_test", "index").toString();
+        UtilAll.deleteFile(new File(storePath));
+        UtilAll.deleteFile(new File("./e96d41b2_IndexService"));
+        storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setBrokerClusterName("IndexService");
+        storeConfig.setBrokerName("IndexServiceBroker");
+        storeConfig.setStorePathRootDir(storePath);
+        
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500 * 1000);
+        storeConfig.setTieredStoreIndexFileMaxIndexNum(2000 * 1000);
+        TieredStoreUtil.getMetadataStore(storeConfig);
+        TieredStoreExecutor.init();
+        TieredFileAllocator tieredFileAllocator = new 
TieredFileAllocator(storeConfig);
+        indexStoreService = new IndexStoreService(tieredFileAllocator, 
storePath);
+        indexStoreService.start();
+    }
+
+    @TearDown
+    public void shutdown() throws IOException {
+        indexStoreService.shutdown();
+        indexStoreService.destroy();
+        TieredStoreExecutor.shutdown();
+    }
+
+    //@Benchmark
+    @Threads(2)
+    @BenchmarkMode(Mode.Throughput)
+    @OutputTimeUnit(TimeUnit.SECONDS)
+    @Warmup(iterations = 1, time = 1)
+    @Measurement(iterations = 1, time = 1)
+    public void doPutThroughputBenchmark() {
+        for (int i = 0; i < 100; i++) {
+            AppendResult result = indexStoreService.putKey(
+                TOPIC_NAME, 123, 2, Collections.singleton(String.valueOf(i)),
+                i * 100L, i * 100, System.currentTimeMillis());
+            if (AppendResult.SUCCESS.equals(result)) {
+                failureCount.increment();
+            }
+        }
+    }
+
+    @Threads(1)
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.SECONDS)
+    @Warmup(iterations = 0)
+    @Measurement(iterations = 1, time = 1)
+    public void doGetThroughputBenchmark() throws ExecutionException, 
InterruptedException {
+        for (int j = 0; j < 10; j++) {
+            for (int i = 0; i < 
storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) {
+                indexStoreService.putKey(
+                    "TopicTest", 123, j, 
Collections.singleton(String.valueOf(i)),
+                    i * 100L, i * 100, System.currentTimeMillis());
+            }
+        }
+
+        int queryCount = 100 * 10000;
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        for (int i = 0; i < queryCount; i++) {
+            List<IndexItem> indexItems = 
indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i),
+                20, 0, System.currentTimeMillis()).get();
+            Assert.assertEquals(10, indexItems.size());
+
+            List<IndexItem> indexItems2 = 
indexStoreService.queryAsync(TOPIC_NAME, String.valueOf(i),
+                5, 0, System.currentTimeMillis()).get();
+            Assert.assertEquals(5, indexItems2.size());
+        }
+        log.info("DoGetThroughputBenchmark test cost: {}ms", 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options opt = new OptionsBuilder()
+            .include(IndexStoreServiceBenchTest.class.getSimpleName())
+            .warmupIterations(0)
+            .measurementIterations(1)
+            .result("result.json")
+            .resultFormat(ResultFormatType.JSON)
+            .build();
+        new Runner(opt).run();
+    }
+}
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
new file mode 100644
index 0000000000..20b4acbfa1
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.file.TieredFileAllocator;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class IndexStoreServiceTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    private static final String TOPIC_NAME = "TopicTest";
+    private static final int TOPIC_ID = 123;
+    private static final int QUEUE_ID = 2;
+    private static final long MESSAGE_OFFSET = 666;
+    private static final int MESSAGE_SIZE = 1024;
+    private static final Set<String> KEY_SET = 
Collections.singleton("MessageKey");
+
+    private String filePath;
+    private TieredMessageStoreConfig storeConfig;
+    private TieredFileAllocator fileAllocator;
+    private IndexStoreService indexService;
+
+    @Before
+    public void init() throws IOException, ClassNotFoundException, 
NoSuchMethodException {
+        TieredStoreExecutor.init();
+        filePath = UUID.randomUUID().toString().replace("-", "").substring(0, 
8);
+        String directory = Paths.get(System.getProperty("user.home"), 
"store_test", filePath).toString();
+        storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setStorePathRootDir(directory);
+        storeConfig.setTieredStoreFilePath(directory);
+        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+        storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+        
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+        fileAllocator = new TieredFileAllocator(storeConfig);
+    }
+
+    @After
+    public void shutdown() {
+        if (indexService != null) {
+            indexService.shutdown();
+            indexService.destroy();
+        }
+        TieredStoreTestUtil.destroyMetadataStore();
+        TieredStoreTestUtil.destroyTempDir(storeConfig.getStorePathRootDir());
+        
TieredStoreTestUtil.destroyTempDir(storeConfig.getTieredStoreFilePath());
+        TieredStoreExecutor.shutdown();
+    }
+
+    @Test
+    public void basicServiceTest() throws InterruptedException {
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        for (int i = 0; i < 50; i++) {
+            Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey(
+                TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, i * 100, 
MESSAGE_SIZE, System.currentTimeMillis()));
+            TimeUnit.MILLISECONDS.sleep(1);
+        }
+        ConcurrentSkipListMap<Long, IndexFile> timeStoreTable = 
indexService.getTimeStoreTable();
+        Assert.assertEquals(3, timeStoreTable.size());
+    }
+
+    @Test
+    public void doConvertOldFormatTest() throws IOException {
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        long timestamp = indexService.getTimeStoreTable().firstKey();
+        Assert.assertEquals(AppendResult.SUCCESS, indexService.putKey(
+            TOPIC_NAME, TOPIC_ID, QUEUE_ID, KEY_SET, MESSAGE_OFFSET, 
MESSAGE_SIZE, timestamp));
+        indexService.shutdown();
+
+        File file = new File(Paths.get(filePath, 
IndexStoreService.FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString());
+        DefaultMappedFile mappedFile = new DefaultMappedFile(file.getName(), 
(int) file.length());
+        mappedFile.renameTo(String.valueOf(new File(file.getParent(), 
"0000")));
+        mappedFile.shutdown(10 * 1000);
+
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        ConcurrentSkipListMap<Long, IndexFile> timeStoreTable = 
indexService.getTimeStoreTable();
+        Assert.assertEquals(1, timeStoreTable.size());
+        Assert.assertEquals(Long.valueOf(timestamp), 
timeStoreTable.firstKey());
+        mappedFile.destroy(10 * 1000);
+    }
+
+    @Test
+    public void concurrentPutTest() throws InterruptedException {
+        ExecutorService executorService = Executors.newFixedThreadPool(
+            4, new ThreadFactoryImpl("ConcurrentPutTest"));
+        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500);
+        storeConfig.setTieredStoreIndexFileMaxIndexNum(2000);
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        long timestamp = System.currentTimeMillis();
+
+        // first item is invalid
+        AtomicInteger success = new AtomicInteger();
+        int indexCount = 5000;
+        CountDownLatch latch = new CountDownLatch(indexCount);
+        for (int i = 0; i < indexCount; i++) {
+            final int index = i;
+            executorService.submit(() -> {
+                try {
+                    AppendResult result = indexService.putKey(
+                        TOPIC_NAME, TOPIC_ID, QUEUE_ID, 
Collections.singleton(String.valueOf(index)),
+                        index * 100, MESSAGE_SIZE, timestamp + index);
+                    if (AppendResult.SUCCESS.equals(result)) {
+                        success.incrementAndGet();
+                    }
+                } catch (Exception e) {
+                    log.error("ConcurrentPutTest error", e);
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+        Assert.assertEquals(3, indexService.getTimeStoreTable().size());
+        executorService.shutdown();
+    }
+
+    @Test
+    public void doCompactionTest() throws InterruptedException {
+        concurrentPutTest();
+        IndexFile indexFile = indexService.getNextSealedFile();
+        Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, 
indexFile.getFileStatus());
+
+        indexService.doCompactThenUploadFile(indexFile);
+        indexService.setCompactTimestamp(indexFile.getTimestamp());
+        indexFile.destroy();
+
+        List<IndexFile> files = new 
ArrayList<>(indexService.getTimeStoreTable().values());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, 
files.get(0).getFileStatus());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.SEALED, 
files.get(1).getFileStatus());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, 
files.get(2).getFileStatus());
+
+        indexFile = indexService.getNextSealedFile();
+        indexService.doCompactThenUploadFile(indexFile);
+        indexService.setCompactTimestamp(indexFile.getTimestamp());
+        files = new ArrayList<>(indexService.getTimeStoreTable().values());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, 
files.get(0).getFileStatus());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, 
files.get(1).getFileStatus());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, 
files.get(2).getFileStatus());
+
+        indexFile = indexService.getNextSealedFile();
+        Assert.assertNull(indexFile);
+        files = new ArrayList<>(indexService.getTimeStoreTable().values());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, 
files.get(0).getFileStatus());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, 
files.get(1).getFileStatus());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UNSEALED, 
files.get(2).getFileStatus());
+    }
+
+    @Test
+    public void runServiceTest() throws InterruptedException {
+        concurrentPutTest();
+        indexService.start();
+        
await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(()
 -> {
+            boolean result = true;
+            ArrayList<IndexFile> files = new 
ArrayList<>(indexService.getTimeStoreTable().values());
+            result &= 
IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus());
+            result &= 
IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(1).getFileStatus());
+            result &= 
IndexFile.IndexStatusEnum.UNSEALED.equals(files.get(2).getFileStatus());
+            return result;
+        });
+    }
+
+    @Test
+    public void restartServiceTest() throws InterruptedException {
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        for (int i = 0; i < 20; i++) {
+            AppendResult result = indexService.putKey(
+                TOPIC_NAME, TOPIC_ID, QUEUE_ID, 
Collections.singleton(String.valueOf(i)),
+                i * 100L, MESSAGE_SIZE, System.currentTimeMillis());
+            Assert.assertEquals(AppendResult.SUCCESS, result);
+            TimeUnit.MILLISECONDS.sleep(1);
+        }
+        long timestamp = indexService.getTimeStoreTable().firstKey();
+        indexService.shutdown();
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        Assert.assertEquals(timestamp, 
indexService.getTimeStoreTable().firstKey().longValue());
+
+        indexService.start();
+        
await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).until(()
 -> {
+            ArrayList<IndexFile> files = new 
ArrayList<>(indexService.getTimeStoreTable().values());
+            return 
IndexFile.IndexStatusEnum.UPLOAD.equals(files.get(0).getFileStatus());
+        });
+        indexService.shutdown();
+
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        Assert.assertEquals(timestamp, 
indexService.getTimeStoreTable().firstKey().longValue());
+        Assert.assertEquals(2, indexService.getTimeStoreTable().size());
+        Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
+            
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
+    }
+
+    @Test
+    public void queryFromFileTest() throws InterruptedException, 
ExecutionException {
+        long timestamp = System.currentTimeMillis();
+        indexService = new IndexStoreService(fileAllocator, filePath);
+
+        // three files, echo contains 19 items
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < 20 - 1; j++) {
+                AppendResult result = indexService.putKey(
+                    TOPIC_NAME, TOPIC_ID, QUEUE_ID, 
Collections.singleton(String.valueOf(j)),
+                    i * 100L + j, MESSAGE_SIZE, System.currentTimeMillis());
+                Assert.assertEquals(AppendResult.SUCCESS, result);
+                TimeUnit.MILLISECONDS.sleep(1);
+            }
+        }
+
+        ArrayList<IndexFile> files = new 
ArrayList<>(indexService.getTimeStoreTable().values());
+        Assert.assertEquals(3, files.size());
+
+        for (int i = 0; i < 3; i++) {
+            List<IndexItem> indexItems = indexService.queryAsync(
+                TOPIC_NAME, String.valueOf(1), 1, timestamp, 
System.currentTimeMillis()).get();
+            Assert.assertEquals(1, indexItems.size());
+
+            indexItems = indexService.queryAsync(
+                TOPIC_NAME, String.valueOf(1), 3, timestamp, 
System.currentTimeMillis()).get();
+            Assert.assertEquals(3, indexItems.size());
+
+            indexItems = indexService.queryAsync(
+                TOPIC_NAME, String.valueOf(1), 5, timestamp, 
System.currentTimeMillis()).get();
+            Assert.assertEquals(3, indexItems.size());
+        }
+    }
+
+    @Test
+    public void concurrentGetTest() throws InterruptedException {
+        storeConfig.setTieredStoreIndexFileMaxIndexNum(2000);
+        indexService = new IndexStoreService(fileAllocator, filePath);
+        indexService.start();
+
+        int fileCount = 10;
+        for (int j = 0; j < fileCount; j++) {
+            for (int i = 0; i < 
storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) {
+                indexService.putKey(TOPIC_NAME, TOPIC_ID, j, 
Collections.singleton(String.valueOf(i)),
+                    i * 100L, i * 100, System.currentTimeMillis());
+            }
+            TimeUnit.MILLISECONDS.sleep(1);
+        }
+
+        CountDownLatch latch = new CountDownLatch(fileCount * 3);
+        AtomicBoolean result = new AtomicBoolean(true);
+        ExecutorService executorService = Executors.newFixedThreadPool(
+            4, new ThreadFactoryImpl("ConcurrentGetTest"));
+
+        for (int i = 0; i < fileCount; i++) {
+            int finalI = i;
+            executorService.submit(() -> {
+                for (int j = 1; j <= 3; j++) {
+                    try {
+                        List<IndexItem> indexItems = indexService.queryAsync(
+                            TOPIC_NAME, String.valueOf(finalI), j * 5, 0, 
System.currentTimeMillis()).get();
+                        if (Math.min(fileCount, j * 5) != indexItems.size()) {
+                            result.set(false);
+                        }
+                    } catch (Exception e) {
+                        result.set(false);
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+        }
+
+        Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
+        executorService.shutdown();
+        Assert.assertTrue(result.get());
+    }
+}
\ No newline at end of file
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index a413f2113e..68277cacc5 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -135,7 +135,6 @@ public class MessageBufferUtilTest {
         Assert.assertEquals("uservalue0", properties.get("userkey"));
     }
 
-
     @Test
     public void testGetTotalSize() {
         ByteBuffer buffer = buildMockedMessageBuffer();
diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml 
b/tieredstore/src/test/resources/rmq.logback-test.xml
index a7933b5efb..ac0895e05e 100644
--- a/tieredstore/src/test/resources/rmq.logback-test.xml
+++ b/tieredstore/src/test/resources/rmq.logback-test.xml
@@ -19,11 +19,22 @@
         <!-- encoders are assigned the type
              ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
         <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - 
%msg%n</pattern>
+            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
         </encoder>
     </appender>
 
+    <property name="CONSOLE_LOG_PATTERN"
+              value="%d{yyyy-MM-dd HH:mm:ss.SSS,GMT+8} 
${LOG_LEVEL_PATTERN:-%5p} [%20.20thread] [%20.20logger{39}] %m%n"/>
+
     <root level="info">
-        <appender-ref ref="STDOUT" />
+        <appender-ref ref="STDOUT"/>
     </root>
+
+    <logger name="RocketmqStore" additivity="false" level="ERROR">
+        <appender-ref ref="STDOUT"/>
+    </logger>
+
+    <logger name="RocketmqCommon" additivity="false" level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </logger>
 </configuration>

Reply via email to