mimaison commented on code in PR #19469:
URL: https://github.com/apache/kafka/pull/19469#discussion_r2042814518


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java:
##########
@@ -0,0 +1,1207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.storage.internals.log.RemoteIndexCache.DIR_NAME;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFile;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFileName;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFile;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFileName;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFile;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFileName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class RemoteIndexCacheTest {
+
+    private final long defaultRemoteIndexCacheSizeBytes = 1024 * 1024L;
+    private final Logger logger = 
LoggerFactory.getLogger(RemoteIndexCacheTest.class);
+    private final MockTime time = new MockTime();
+    private final int brokerId = 1;
+    private final long baseOffset = Integer.MAX_VALUE + 101337L; // start with 
a base offset which is a long
+    private final long lastOffset = baseOffset + 30L;
+    private final int segmentSize = 1024;
+    private final RemoteStorageManager rsm = mock(RemoteStorageManager.class);
+    private RemoteIndexCache cache;
+    private RemoteLogSegmentMetadata rlsMetadata;
+    private File logDir;
+    private File tpDir;
+    private TopicIdPartition idPartition;
+
+    @BeforeEach
+    public void setup() throws IOException, RemoteStorageException {
+        idPartition = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        logDir = TestUtils.tempDirectory("kafka-" + 
this.getClass().getSimpleName());
+        tpDir = new File(logDir, idPartition.toString());
+        Files.createDirectory(tpDir.toPath());
+
+        RemoteLogSegmentId remoteLogSegmentId = 
RemoteLogSegmentId.generateNew(idPartition);
+        rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), 
segmentSize, Collections.singletonMap(0, 0L));
+
+        cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, 
tpDir.toString());
+
+        mockRsmFetchIndex(rsm);
+    }
+
+    @AfterEach
+    public void cleanup() {
+        reset(rsm);
+        // the files created for the test will be deleted automatically on 
thread exit since we use temp dir
+        Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
+        // best effort to delete the per-test resource. Even if we don't 
delete, it is ok because the parent directory
+        // will be deleted at the end of test.
+        try {
+            Utils.delete(logDir);
+        } catch (IOException ioe) {
+            // ignore
+        }
+        // Verify no lingering threads. It is important to have this as the 
very last statement in the @AfterEach
+        // because this may throw an exception and prevent cleanup after it
+        
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,
 true);
+    }
+
+    @Test
+    public void testIndexFileNameAndLocationOnDisk() {
+        RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata);
+        Path offsetIndexFile = entry.offsetIndex().file().toPath();
+        Path txnIndexFile = entry.txnIndex().file().toPath();
+        Path timeIndexFile = entry.timeIndex().file().toPath();
+
+        String expectedOffsetIndexFileName = 
remoteOffsetIndexFileName(rlsMetadata);
+        String expectedTimeIndexFileName = 
remoteTimeIndexFileName(rlsMetadata);
+        String expectedTxnIndexFileName = 
remoteTransactionIndexFileName(rlsMetadata);
+
+        assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName().toString());
+        assertEquals(expectedTxnIndexFileName, 
txnIndexFile.getFileName().toString());
+        assertEquals(expectedTimeIndexFileName, 
timeIndexFile.getFileName().toString());
+
+        // assert that parent directory for the index files is correct
+        assertEquals(DIR_NAME, 
offsetIndexFile.getParent().getFileName().toString(),
+                "offsetIndex=" + offsetIndexFile + " is created under 
incorrect parent");
+        assertEquals(DIR_NAME, 
txnIndexFile.getParent().getFileName().toString(),
+                "txnIndex=" + txnIndexFile + " is created under incorrect 
parent");
+        assertEquals(DIR_NAME, 
timeIndexFile.getParent().getFileName().toString(),
+                "timeIndex=" + timeIndexFile + " is created under incorrect 
parent");
+    }
+
+    @Test
+    public void testFetchIndexFromRemoteStorage() throws 
RemoteStorageException {
+        OffsetIndex offsetIndex = 
cache.getIndexEntry(rlsMetadata).offsetIndex();
+        OffsetPosition offsetPosition1 = offsetIndex.entry(1);
+        // this call should have invoked fetchOffsetIndex, fetchTimestampIndex 
once
+        int resultPosition = cache.lookupOffset(rlsMetadata, 
offsetPosition1.offset);
+        assertEquals(offsetPosition1.position, resultPosition);
+        verifyFetchIndexInvocation(1, List.of(IndexType.OFFSET, 
IndexType.TIMESTAMP));
+
+        // this should not cause fetching index from RemoteStorageManager as 
it is already fetched earlier
+        reset(rsm);
+        OffsetPosition offsetPosition2 = offsetIndex.entry(2);
+        int resultPosition2 = cache.lookupOffset(rlsMetadata, 
offsetPosition2.offset);
+        assertEquals(offsetPosition2.position, resultPosition2);
+        assertNotNull(cache.getIndexEntry(rlsMetadata));
+        verifyNoInteractions(rsm);
+    }
+
+    @Test
+    public void testFetchIndexForMissingTransactionIndex() throws 
RemoteStorageException {
+        when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), 
any(IndexType.class))).thenAnswer(ans -> {
+            RemoteLogSegmentMetadata metadata = ans.getArgument(0);
+            IndexType indexType = ans.getArgument(1);
+            OffsetIndex offsetIdx = 
createOffsetIndexForSegmentMetadata(metadata, tpDir);
+            TimeIndex timeIdx = createTimeIndexForSegmentMetadata(metadata, 
tpDir);
+            maybeAppendIndexEntries(offsetIdx, timeIdx);
+            return switch (indexType) {
+                case OFFSET -> new FileInputStream(offsetIdx.file());
+                case TIMESTAMP -> new FileInputStream(timeIdx.file());
+                    // Throw RemoteResourceNotFoundException since transaction 
index is not available
+                case TRANSACTION -> throw new 
RemoteResourceNotFoundException("txn index not found");
+                case LEADER_EPOCH -> null; // leader-epoch-cache is not 
accessed.
+                case PRODUCER_SNAPSHOT -> null;  // producer-snapshot is not 
accessed.
+            };
+        });
+
+        RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata);
+        // Verify an empty file is created in the cache directory
+        assertTrue(entry.txnIndex().file().exists());
+        assertEquals(0, entry.txnIndex().file().length());
+    }
+
+    @Test
+    public void testPositionForNonExistingIndexFromRemoteStorage() {
+        OffsetIndex offsetIndex = 
cache.getIndexEntry(rlsMetadata).offsetIndex();
+        int lastOffsetPosition = cache.lookupOffset(rlsMetadata, 
offsetIndex.lastOffset());
+        long greaterOffsetThanLastOffset = offsetIndex.lastOffset() + 1;
+        assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, 
greaterOffsetThanLastOffset));
+
+        // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for 
offsets smaller than the last entry in the offset index.
+        OffsetPosition nonExistentOffsetPosition = new 
OffsetPosition(baseOffset, 0);
+        long lowerOffsetThanBaseOffset = offsetIndex.baseOffset() - 1;
+        assertEquals(nonExistentOffsetPosition.position, 
cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset));
+    }
+
+    @Test
+    public void testCacheEntryExpiry() throws IOException, 
RemoteStorageException, InterruptedException {
+        long estimateEntryBytesSize = estimateOneEntryBytesSize();
+        // close existing cache created in test setup before creating a new one
+        Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
+        cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, 
tpDir.toString());
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(3, tpId);
+
+        assertCacheSize(0);
+        // getIndex for first time will call rsm#fetchIndex
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        // Calling getIndex on the same entry should not call rsm#fetchIndex 
again, but it should retrieve from cache
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        verifyFetchIndexInvocation(1);
+
+        // Here a new key metadataList(1) is invoked, that should call 
rsm#fetchIndex, making the count to 2
+        cache.getIndexEntry(metadataList.get(0));
+        cache.getIndexEntry(metadataList.get(1));
+        assertCacheSize(2);
+        verifyFetchIndexInvocation(2);
+
+        // Getting index for metadataList.last should call rsm#fetchIndex
+        // to populate this entry one of the other 2 entries will be evicted. 
We don't know which one since it's based on
+        // a probabilistic formula for Window TinyLfu. See docs for 
RemoteIndexCache
+        int size = metadataList.size();
+        assertNotNull(cache.getIndexEntry(metadataList.get(size - 1)));
+        assertAtLeastOnePresent(cache, 
metadataList.get(1).remoteLogSegmentId().id(), 
metadataList.get(0).remoteLogSegmentId().id());
+        assertCacheSize(2);
+        verifyFetchIndexInvocation(3);
+
+        // getting index for last expired entry should call rsm#fetchIndex as 
that entry was expired earlier
+        Optional<RemoteLogSegmentMetadata> missingEntryOpt = Optional.empty();
+        for (RemoteLogSegmentMetadata entry : metadataList) {
+            Uuid segmentId = entry.remoteLogSegmentId().id();
+            if (!cache.internalCache().asMap().containsKey(segmentId)) {
+                missingEntryOpt = Optional.of(entry);
+                break;
+            }
+        }
+        assertFalse(missingEntryOpt.isEmpty());
+        cache.getIndexEntry(missingEntryOpt.get());
+        assertCacheSize(2);
+        verifyFetchIndexInvocation(4);
+    }
+
+    @Test
+    public void testGetIndexAfterCacheClose() throws IOException, 
RemoteStorageException, InterruptedException {
+        // close existing cache created in test setup before creating a new one
+        Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
+
+        cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, 
tpDir.toString());
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(3, tpId);
+
+        assertCacheSize(0);
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        verifyFetchIndexInvocation(1);
+
+        cache.close();
+
+        // Check IllegalStateException is thrown when index is accessed after 
it is closed.
+        assertThrows(IllegalStateException.class, () -> 
cache.getIndexEntry(metadataList.get(0)));
+    }
+
+    @Test
+    public void testCloseIsIdempotent() throws IOException {
+        // generate and add entry to cache
+        RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
+        cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), 
spyEntry);
+
+        cache.close();
+        cache.close();
+
+        // verify that entry is only closed once
+        verify(spyEntry).close();
+    }
+
+    @Test
+    public void testCacheEntryIsDeletedOnRemoval() throws IOException, 
InterruptedException {
+        Uuid internalIndexKey = rlsMetadata.remoteLogSegmentId().id();
+        RemoteIndexCache.Entry cacheEntry = generateSpyCacheEntry();
+
+        // verify index files on disk
+        
assertTrue(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), 
"Offset index file should be present on disk at " + tpDir.toPath());
+        
assertTrue(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(),
 "Txn index file should be present on disk at " + tpDir.toPath());
+        
assertTrue(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(),
 "Time index file should be present on disk at " + tpDir.toPath());
+
+        // add the spied entry into the cache, it will overwrite the non-spied 
entry
+        cache.internalCache().put(internalIndexKey, cacheEntry);
+
+        // no expired entries yet
+        assertEquals(0, cache.expiredIndexes().size(), "expiredIndex queue 
should be zero at start of test");
+
+        // call remove function to mark the entry for removal
+        cache.remove(internalIndexKey);
+
+        // wait until entry is marked for deletion
+        TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup,
+                "Failed to mark cache entry for cleanup after invalidation");
+        TestUtils.waitForCondition(cacheEntry::isCleanStarted,
+                "Failed to cleanup cache entry after invalidation");
+
+        // first it will be marked for cleanup, second time markForCleanup is 
called when cleanup() is called
+        verify(cacheEntry, times(2)).markForCleanup();
+        // after that async it will be cleaned up
+        verify(cacheEntry).cleanup();
+
+        // verify that index(s) rename is only called 1 time
+        verify(cacheEntry.timeIndex()).renameTo(any(File.class));
+        verify(cacheEntry.offsetIndex()).renameTo(any(File.class));
+        verify(cacheEntry.txnIndex()).renameTo(any(File.class));
+
+        // verify no index files on disk
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent(),
+                "Offset index file should not be present on disk at " + 
tpDir.toPath());
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(),
+                "Txn index file should not be present on disk at " + 
tpDir.toPath());
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(),
+                "Time index file should not be present on disk at " + 
tpDir.toPath());
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isPresent(),
+                "Index file marked for deletion should not be present on disk 
at " + tpDir.toPath());
+    }
+
+    private Optional<Path> getIndexFileFromDisk(String suffix) throws 
IOException {
+        return Files.walk(tpDir.toPath())
+                .filter(Files::isRegularFile)
+                .filter(path -> path.getFileName().toString().endsWith(suffix))
+                .findAny();
+    }
+
+    @Test
+    public void testCleanerThreadShutdown() throws IOException, 
InterruptedException {
+        // cache is empty at beginning
+        assertTrue(cache.internalCache().asMap().isEmpty());
+        // verify that cleaner thread is running
+        Set<Thread> threads = getRunningCleanerThread();
+        assertEquals(1, threads.size(),
+                "Found unexpected " + threads.size() + " threads=" + 
threads.stream().map(Thread::getName).collect(Collectors.joining(", ")));
+        // create a new entry
+        RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
+        // an exception should not close the cleaner thread
+        doThrow(new RuntimeException("kaboom! I am expected exception in unit 
test.")).when(spyEntry).cleanup();
+        Uuid key = Uuid.randomUuid();
+        cache.internalCache().put(key, spyEntry);
+        // trigger cleanup
+        cache.remove(key);
+        // Give the thread cleaner thread some time to throw an exception
+        Thread.sleep(100);
+        verify(spyEntry, times(1)).cleanup();
+        // Verify that Cleaner thread is still running even when exception is 
thrown in doWork()
+        threads = getRunningCleanerThread();
+        assertEquals(1, threads.size(),
+                "Found unexpected " + threads.size() + " threads=" + 
threads.stream().map(Thread::getName).collect(Collectors.joining(", ")));
+        // close the cache properly
+        cache.close();
+        // verify that the thread is closed properly
+        threads = getRunningCleanerThread();
+        assertTrue(threads.isEmpty(), "Found unexpected " + threads.size() + " 
threads=" + threads.stream().map(Thread::getName).collect(Collectors.joining(", 
")));
+        // if the thread is correctly being shutdown it will not be running
+        assertFalse(cache.cleanerThread().isRunning(), "Unexpected thread 
state=running. Check error logs.");
+    }
+
+    @Test
+    public void testClose() throws IOException, InterruptedException {
+        RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
+        cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), 
spyEntry);
+
+        TestUtils.waitForCondition(() -> cache.cleanerThread().isStarted(), 
"Cleaner thread should be started");
+
+        // close the cache
+        cache.close();
+
+        // closing the cache should close the entry
+        verify(spyEntry).close();
+
+        // close for all index entries must be invoked
+        verify(spyEntry.txnIndex()).close();
+        verify(spyEntry.offsetIndex()).close();
+        verify(spyEntry.timeIndex()).close();
+
+        // index files must not be deleted
+        verify(spyEntry.txnIndex(), times(0)).deleteIfExists();
+        verify(spyEntry.offsetIndex(), times(0)).deleteIfExists();
+        verify(spyEntry.timeIndex(), times(0)).deleteIfExists();
+
+        // verify cleaner thread is shutdown
+        assertTrue(cache.cleanerThread().isShutdownComplete());
+    }
+
+    @Test
+    public void testConcurrentReadWriteAccessForCache() throws 
InterruptedException, RemoteStorageException {
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(3, tpId);
+
+        assertCacheSize(0);
+        // getIndex for first time will call rsm#fetchIndex
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        verifyFetchIndexInvocation(1, List.of(IndexType.OFFSET, 
IndexType.TIMESTAMP));
+        reset(rsm);
+
+        // Simulate a concurrency situation where one thread is reading the 
entry already present in the cache (cache hit)
+        // and the other thread is reading an entry which is not available in 
the cache (cache miss). The expected behaviour
+        // is for the former thread to succeed while latter is fetching from 
rsm.
+        // In this test we simulate the situation using latches. We perform 
the following operations:
+        // 1. Start the CacheMiss thread and wait until it starts executing 
the rsm.fetchIndex
+        // 2. Block the CacheMiss thread inside the call to rsm.fetchIndex.
+        // 3. Start the CacheHit thread. Assert that it performs a successful 
read.
+        // 4. On completion of successful read by CacheHit thread, signal the 
CacheMiss thread to release its block.
+        // 5. Validate that the test passes. If the CacheMiss thread was 
blocking the CacheHit thread, the test will fail.
+        CountDownLatch latchForCacheHit = new CountDownLatch(1);
+        CountDownLatch latchForCacheMiss = new CountDownLatch(1);
+
+        Runnable readerCacheHit = () -> {
+            // Wait for signal to start executing the read
+            logger.debug("Waiting for signal to begin read from {}", 
Thread.currentThread());
+            try {
+                latchForCacheHit.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            RemoteIndexCache.Entry entry = 
cache.getIndexEntry(metadataList.get(0));
+            assertNotNull(entry);
+            // Signal the CacheMiss to unblock itself
+            logger.debug("Signaling CacheMiss to unblock from {}", 
Thread.currentThread());
+            latchForCacheMiss.countDown();
+        };
+
+        when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), 
any(IndexType.class))).thenAnswer(answer -> {
+            logger.debug("Signaling CacheHit to begin read from {}", 
Thread.currentThread());
+            latchForCacheHit.countDown();
+            logger.debug("Waiting for signal to complete rsm fetch from {}", 
Thread.currentThread());
+            latchForCacheMiss.await();
+            return null;
+        });
+
+        Runnable readerCacheMiss = () -> {
+            int size = metadataList.size();
+            RemoteIndexCache.Entry entry = 
cache.getIndexEntry(metadataList.get(size - 1));
+            assertNotNull(entry);
+        };
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            executor.submit(readerCacheMiss);
+            executor.submit(readerCacheHit);
+            assertTrue(latchForCacheMiss.await(30, TimeUnit.SECONDS));
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testReloadCacheAfterClose() throws IOException, 
RemoteStorageException, InterruptedException {
+        long estimateEntryBytesSize = estimateOneEntryBytesSize();
+        // close existing cache created in test setup before creating a new one
+        Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
+        cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, 
tpDir.toString());
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(3, tpId);
+
+        assertCacheSize(0);
+        // getIndex for first time will call rsm#fetchIndex
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        // Calling getIndex on the same entry should not call rsm#fetchIndex 
again, but it should retrieve from cache
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        verifyFetchIndexInvocation(1);
+
+        // Here a new key metadataList(1) is invoked, that should call 
rsm#fetchIndex, making the count to 2
+        cache.getIndexEntry(metadataList.get(1));
+        assertCacheSize(2);
+        // Calling getIndex on the same entry should not call rsm#fetchIndex 
again, but it should retrieve from cache
+        cache.getIndexEntry(metadataList.get(1));
+        assertCacheSize(2);
+        verifyFetchIndexInvocation(2);
+
+        // Here a new key metadataList(2) is invoked, that should call 
rsm#fetchIndex
+        // The cache max size is 2, it will remove one entry and keep the 
overall size to 2
+        cache.getIndexEntry(metadataList.get(2));
+        assertCacheSize(2);
+        // Calling getIndex on the same entry may call rsm#fetchIndex or not, 
it depends on the cache implementation so
+        // we only need to verify the number of calling is in our range.
+        cache.getIndexEntry(metadataList.get(2));
+        assertCacheSize(2);
+        verifyFetchIndexInvocationWithRange(3, 4);
+
+        // Close the cache
+        cache.close();
+
+        // Reload the cache from the disk and check the cache size is same as 
earlier
+        RemoteIndexCache reloadedCache = new RemoteIndexCache(2 * 
estimateEntryBytesSize, rsm, tpDir.toString());
+        assertEquals(2, reloadedCache.internalCache().asMap().size());
+        reloadedCache.close();
+
+        verifyNoMoreInteractions(rsm);
+    }
+
+    @Test
+    public void testRemoveItem() throws InterruptedException, IOException {
+        RemoteLogSegmentId segmentId = rlsMetadata.remoteLogSegmentId();
+        Uuid segmentUuid = segmentId.id();
+        // generate and add entry to cache
+        RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(segmentId);
+        cache.internalCache().put(segmentUuid, spyEntry);
+        assertTrue(cache.internalCache().asMap().containsKey(segmentUuid));
+        assertFalse(spyEntry.isMarkedForCleanup());
+
+        cache.remove(segmentId.id());
+        assertFalse(cache.internalCache().asMap().containsKey(segmentUuid));
+        TestUtils.waitForCondition(spyEntry::isMarkedForCleanup, "Failed to 
mark cache entry for cleanup after invalidation");
+    }
+
+    @Test
+    public void testRemoveNonExistentItem() throws IOException {
+        // generate and add entry to cache
+        RemoteLogSegmentId segmentId = rlsMetadata.remoteLogSegmentId();
+        Uuid segmentUuid = segmentId.id();
+        // generate and add entry to cache
+        RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(segmentId);
+        cache.internalCache().put(segmentUuid, spyEntry);
+        assertTrue(cache.internalCache().asMap().containsKey(segmentUuid));
+
+        // remove a random Uuid
+        cache.remove(Uuid.randomUuid());
+        assertTrue(cache.internalCache().asMap().containsKey(segmentUuid));
+        assertFalse(spyEntry.isMarkedForCleanup());
+    }
+
+    @Test
+    public void testRemoveMultipleItems() throws InterruptedException, 
IOException {
+        // generate and add entry to cache
+        Map<Uuid, RemoteIndexCache.Entry> uuidAndEntryList = new HashMap<>();
+        for (int i = 0; i < 10; i++) {
+            RemoteLogSegmentId segmentId = 
RemoteLogSegmentId.generateNew(idPartition);
+            Uuid segmentUuid = segmentId.id();
+            RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(segmentId);
+            uuidAndEntryList.put(segmentUuid, spyEntry);
+
+            cache.internalCache().put(segmentUuid, spyEntry);
+            assertTrue(cache.internalCache().asMap().containsKey(segmentUuid));
+            assertFalse(spyEntry.isMarkedForCleanup());
+        }
+        cache.removeAll(uuidAndEntryList.keySet());
+        for (RemoteIndexCache.Entry entry : uuidAndEntryList.values()) {
+            TestUtils.waitForCondition(entry::isMarkedForCleanup, "Failed to 
mark cache entry for cleanup after invalidation");
+        }
+    }
+
+    @Test
+    public void testClearCacheAndIndexFilesWhenResizeCache() throws 
InterruptedException {
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(1, tpId);
+
+        assertCacheSize(0);
+        // getIndex for first time will call rsm#fetchIndex
+        RemoteIndexCache.Entry cacheEntry = 
cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent());
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent());
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent());
+
+        cache.resizeCacheSize(1L);
+
+        // wait until entry is marked for deletion
+        TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup,
+                "Failed to mark cache entry for cleanup after resizing 
cache.");
+        TestUtils.waitForCondition(cacheEntry::isCleanStarted,
+                "Failed to cleanup cache entry after resizing cache.");
+
+        // verify no index files on remote cache dir
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(),
+                "Offset index file should not be present on disk at " + 
cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(),
+                "Txn index file should not be present on disk at " + 
cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(),
+                "Time index file should not be present on disk at " + 
cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
+                "Index file marked for deletion should not be present on disk 
at " + cache.cacheDir());
+
+        assertCacheSize(0);
+    }
+
+    @Test
+    public void testCorrectnessForCacheAndIndexFilesWhenResizeCache() throws 
IOException, InterruptedException, RemoteStorageException {
+        // The test process for resizing is: put 1 entry -> evict to empty -> 
put 3 entries with limited capacity of 2 entries ->
+        // evict to 1 entry -> resize to 1 entry size -> resize to 2 entries 
size
+        long estimateEntryBytesSize = estimateOneEntryBytesSize();
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(3, tpId);
+
+        assertCacheSize(0);
+        // getIndex for first time will call rsm#fetchIndex
+        RemoteIndexCache.Entry cacheEntry = 
cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent());
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent());
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent());
+
+        // Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+        cache.resizeCacheSize(1L);
+
+        // wait until entry is marked for deletion
+        TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup,
+                "Failed to mark cache entry for cleanup after resizing 
cache.");
+        TestUtils.waitForCondition(cacheEntry::isCleanStarted,
+                "Failed to cleanup cache entry after resizing cache.");
+
+        // verify no index files on remote cache dir
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(),
+                "Offset index file should not be present on disk at " + 
cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(),
+                "Txn index file should not be present on disk at " + 
cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(),
+                "Time index file should not be present on disk at " + 
cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
+                "Index file marked for deletion should not be present on disk 
at " + cache.cacheDir());
+
+        assertCacheSize(0);
+
+        // Increase cache capacity to only store 2 entries
+        cache.resizeCacheSize(2 * estimateEntryBytesSize);
+        assertCacheSize(0);
+
+        RemoteIndexCache.Entry entry0 = 
cache.getIndexEntry(metadataList.get(0));
+        RemoteIndexCache.Entry entry1 = 
cache.getIndexEntry(metadataList.get(1));
+        RemoteIndexCache.Entry entry2 = 
cache.getIndexEntry(metadataList.get(2));
+        List<RemoteIndexCache.Entry> entries = List.of(entry0, entry1, entry2);
+        assertCacheSize(2);
+        EvictionResult result = verifyEntryIsEvicted(metadataList, entries, 1);
+
+        // Reduce cache capacity to only store 1 entry
+        cache.resizeCacheSize(1 * estimateEntryBytesSize);
+        assertCacheSize(1);
+        // After resize, we need to check an entry is deleted from cache and 
the existing segmentMetadata
+        List<RemoteIndexCache.Entry> entryInCache = entries.stream().filter(e 
-> !result.evictedEntries.contains(e)).toList();
+        List<RemoteLogSegmentMetadata> updatedSegmentMetadata = 
metadataList.stream().filter(e -> 
!result.evictedSegmentMetadata.contains(e)).toList();
+        verifyEntryIsEvicted(updatedSegmentMetadata, entryInCache, 1);
+
+        // resize to the same size, all entries should be kept
+        cache.resizeCacheSize(1 * estimateEntryBytesSize);
+
+        List<RemoteLogSegmentMetadata> entriesKept = 
getRemoteLogSegMetadataIsKept(metadataList);
+        // verify all existing entries 
(`cache.getIndexEntry(metadataList(2))`) are kept
+        verifyEntryIsKept(entriesKept);
+        assertCacheSize(1);
+
+        // increase the size
+        cache.resizeCacheSize(2 * estimateEntryBytesSize);
+
+        // verify all entries are kept
+        verifyEntryIsKept(entriesKept);
+        assertCacheSize(1);
+    }
+
+    private List<RemoteLogSegmentMetadata> 
getRemoteLogSegMetadataIsKept(List<RemoteLogSegmentMetadata> metadataToVerify) {
+        return metadataToVerify
+                .stream()
+                .filter(s -> 
cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id()))
+                .toList();
+    }
+
+    record EvictionResult(List<RemoteLogSegmentMetadata> 
evictedSegmentMetadata, List<RemoteIndexCache.Entry> evictedEntries) { }
+
+    private EvictionResult verifyEntryIsEvicted(List<RemoteLogSegmentMetadata> 
metadataToVerify, List<RemoteIndexCache.Entry> entriesToVerify, int 
numOfMarkAsDeleted) throws InterruptedException {
+        TestUtils.waitForCondition(() -> 
entriesToVerify.stream().filter(RemoteIndexCache.Entry::isMarkedForCleanup).count()
 == numOfMarkAsDeleted,
+                "Failed to mark evicted cache entry for cleanup after resizing 
cache.");
+
+        TestUtils.waitForCondition(() -> 
entriesToVerify.stream().filter(RemoteIndexCache.Entry::isCleanStarted).count() 
== numOfMarkAsDeleted,
+                "Failed to cleanup evicted cache entry after resizing cache.");
+
+        List<RemoteIndexCache.Entry> entriesIsMarkedForCleanup = 
entriesToVerify.stream().filter(RemoteIndexCache.Entry::isMarkedForCleanup).toList();
+        List<RemoteIndexCache.Entry> entriesIsCleanStarted = 
entriesToVerify.stream().filter(RemoteIndexCache.Entry::isCleanStarted).toList();
+        // clean up entries and clean start entries should be the same
+        assertEquals(entriesIsMarkedForCleanup, entriesIsCleanStarted);
+
+        // get the logSegMetadata are evicted
+        List<RemoteLogSegmentMetadata> metadataDeleted = metadataToVerify
+                .stream()
+                .filter(s -> 
!cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id()))
+                .toList();
+        assertEquals(numOfMarkAsDeleted, metadataDeleted.size());
+        for (RemoteLogSegmentMetadata metadata : metadataDeleted) {
+            // verify no index files for `entryToVerify` on remote cache dir
+            TestUtils.waitForCondition(() -> 
getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadata)).isEmpty(),
+                    "Offset index file for evicted entry should not be present 
on disk at " + cache.cacheDir());
+            TestUtils.waitForCondition(() -> 
getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadata)).isEmpty(),
+                    "Time index file for evicted entry should not be present 
on disk at " + cache.cacheDir());
+            TestUtils.waitForCondition(() -> 
getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadata)).isEmpty(),
+                    "Txn index file for evicted entry should not be present on 
disk at " + cache.cacheDir());
+            TestUtils.waitForCondition(() -> 
getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
+                    "Index file marked for deletion for evicted entry should 
not be present on disk at " + cache.cacheDir());
+        }
+        return new EvictionResult(metadataDeleted, entriesIsMarkedForCleanup);
+    }
+
+    private void verifyEntryIsEvicted(RemoteLogSegmentMetadata 
metadataToVerify, RemoteIndexCache.Entry entryToVerify) throws 
InterruptedException {
+        // wait until `entryToVerify` is marked for deletion
+        TestUtils.waitForCondition(entryToVerify::isMarkedForCleanup,
+            "Failed to mark evicted cache entry for cleanup after resizing 
cache.");
+        TestUtils.waitForCondition(entryToVerify::isCleanStarted,
+            "Failed to cleanup evicted cache entry after resizing cache.");
+        // verify no index files for `entryToVerify` on remote cache dir
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isEmpty(),
+            "Offset index file for evicted entry should not be present on disk 
at " + cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isEmpty(),
+            "Time index file for evicted entry should not be present on disk 
at " + cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isEmpty(),
+            "Txn index file for evicted entry should not be present on disk at 
" + cache.cacheDir());
+        TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
+            "Index file marked for deletion for evicted entry should not be 
present on disk at " + cache.cacheDir());
+    }
+
+    void verifyEntryIsKept(List<RemoteLogSegmentMetadata> metadataToVerify) {
+        for (RemoteLogSegmentMetadata metadata : metadataToVerify) {
+            assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadata)).isPresent());
+            assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadata)).isPresent());
+            assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadata)).isPresent());
+            assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isEmpty());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = IndexType.class, names = {"OFFSET", "TIMESTAMP", 
"TRANSACTION"})
+    public void testCorruptCacheIndexFileExistsButNotInCache(IndexType 
indexType) throws IOException, RemoteStorageException {
+        // create Corrupted Index File in remote index cache
+        createCorruptedIndexFile(indexType, cache.cacheDir());
+        RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata);
+        // Test would fail if it throws Exception other than 
CorruptIndexException
+        Path offsetIndexFile = entry.offsetIndex().file().toPath();
+        Path txnIndexFile = entry.txnIndex().file().toPath();
+        Path timeIndexFile = entry.timeIndex().file().toPath();
+
+        String expectedOffsetIndexFileName = 
remoteOffsetIndexFileName(rlsMetadata);
+        String expectedTimeIndexFileName = 
remoteTimeIndexFileName(rlsMetadata);
+        String expectedTxnIndexFileName = 
remoteTransactionIndexFileName(rlsMetadata);
+
+        assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName().toString());
+        assertEquals(expectedTxnIndexFileName, 
txnIndexFile.getFileName().toString());
+        assertEquals(expectedTimeIndexFileName, 
timeIndexFile.getFileName().toString());
+
+        // assert that parent directory for the index files is correct
+        assertEquals(DIR_NAME, 
offsetIndexFile.getParent().getFileName().toString(),
+                "offsetIndex=" + offsetIndexFile + " is created under 
incorrect parent");
+        assertEquals(DIR_NAME, 
txnIndexFile.getParent().getFileName().toString(),
+                "txnIndex=" + txnIndexFile + " is created under incorrect 
parent");
+        assertEquals(DIR_NAME, 
timeIndexFile.getParent().getFileName().toString(),
+                "timeIndex=" + timeIndexFile + " is created under incorrect 
parent");
+
+        // file is corrupted it should fetch from remote storage again
+        verifyFetchIndexInvocation(1);
+    }
+
+    @Test
+    public void testConcurrentRemoveReadForCache() throws IOException, 
InterruptedException, ExecutionException {
+        // Create a spy Cache Entry
+        RemoteLogSegmentMetadata rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), 
segmentSize, Collections.singletonMap(0, 0L));
+
+        TimeIndex timeIndex = 
spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)));
+        TransactionIndex txIndex = 
spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)));
+        OffsetIndex offsetIndex = 
spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, 
DIR_NAME)));
+
+        RemoteIndexCache.Entry spyEntry = spy(new 
RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex));
+        cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), 
spyEntry);
+
+        assertCacheSize(1);
+
+        CountDownLatch latchForCacheRead = new CountDownLatch(1);
+        CountDownLatch latchForCacheRemove = new CountDownLatch(1);
+        CountDownLatch latchForTestWait = new CountDownLatch(1);
+
+        AtomicInteger markForCleanupCallCount = new AtomicInteger(0);
+
+        doAnswer(invocation -> {
+            markForCleanupCallCount.incrementAndGet();
+
+            if (markForCleanupCallCount.get() == 1) {
+                // Signal the CacheRead to unblock itself
+                latchForCacheRead.countDown();
+                // Wait for signal to start renaming the files
+                latchForCacheRemove.await();
+                // Calling the markForCleanup() actual method to start 
renaming the files
+                invocation.callRealMethod();
+                // Signal TestWait to unblock itself so that test can be 
completed
+                latchForTestWait.countDown();
+            }
+            return null;
+        }).when(spyEntry).markForCleanup();
+
+        Runnable removeCache = () -> 
cache.remove(rlsMetadata.remoteLogSegmentId().id());
+
+        Runnable readCache = () -> {
+            // Wait for signal to start CacheRead
+            try {
+                latchForCacheRead.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            // Signal the CacheRemove to start renaming the files
+            latchForCacheRemove.countDown();
+        };
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            Future<?> removeCacheFuture = executor.submit(removeCache);
+            Future<?> readCacheFuture = executor.submit(readCache);
+
+            // Verify both tasks are completed without any exception
+            removeCacheFuture.get();
+            readCacheFuture.get();
+
+            // Wait for signal to complete the test
+            latchForTestWait.await();
+
+            // We can't determine read thread or remove thread will go first 
so if,
+            // 1. Read thread go first, cache file should not exist and cache 
size should be zero.
+            // 2. Remove thread go first, cache file should present and cache 
size should be one.
+            // so basically here we are making sure that if cache existed, the 
cache file should exist,
+            // and if cache is non-existed, the cache file should not exist.
+            if (getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent()) {
+                assertCacheSize(1);
+            } else {
+                assertCacheSize(0);
+            }
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testMultipleIndexEntriesExecutionInCorruptException() throws 
IOException, RemoteStorageException {
+        reset(rsm);
+        when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), 
any(IndexType.class))).thenAnswer(ans -> {
+            RemoteLogSegmentMetadata metadata = ans.getArgument(0);
+            IndexType indexType = ans.getArgument(1);
+            OffsetIndex offsetIdx = 
createOffsetIndexForSegmentMetadata(metadata, tpDir);
+            TimeIndex timeIdx = createTimeIndexForSegmentMetadata(metadata, 
tpDir);
+            TransactionIndex txnIdx = 
createTxIndexForSegmentMetadata(metadata, tpDir);
+            maybeAppendIndexEntries(offsetIdx, timeIdx);
+            // Create corrupted index file
+            createCorruptTimeIndexOffsetFile(tpDir);
+            return switch (indexType) {
+                case OFFSET -> new FileInputStream(offsetIdx.file());
+                case TIMESTAMP -> new FileInputStream(timeIdx.file());
+                case TRANSACTION -> new FileInputStream(txnIdx.file());
+                case LEADER_EPOCH -> null; // leader-epoch-cache is not 
accessed.
+                case PRODUCER_SNAPSHOT -> null; // producer-snapshot is not 
accessed.
+            };
+        });
+
+        assertThrows(CorruptIndexException.class, () -> 
cache.getIndexEntry(rlsMetadata));
+        
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()));
+        verifyFetchIndexInvocation(1, List.of(IndexType.OFFSET, 
IndexType.TIMESTAMP));
+        verifyFetchIndexInvocation(0, List.of(IndexType.TRANSACTION));
+        // Current status
+        // (cache is null)
+        // RemoteCacheDir contain
+        // 1. Offset Index File is fine and not corrupted
+        // 2. Time Index File is corrupted
+        // What should be the code flow in next execution
+        // 1. No rsm call for fetching OffSet Index File.
+        // 2. Time index file should be fetched from remote storage again as 
it is corrupted in the first execution.
+        // 3. Transaction index file should be fetched from remote storage.
+        reset(rsm);
+        // delete all files created in tpDir
+        List<Path> paths = Files.walk(tpDir.toPath(), 1)
+                .filter(Files::isRegularFile)
+                .toList();
+        for (Path path : paths) {
+            Files.deleteIfExists(path);
+        }
+        // rsm should return no corrupted file in the 2nd execution
+        when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), 
any(IndexType.class))).thenAnswer(ans -> {
+            RemoteLogSegmentMetadata metadata = ans.getArgument(0);
+            IndexType indexType = ans.getArgument(1);
+            OffsetIndex offsetIdx = 
createOffsetIndexForSegmentMetadata(metadata, tpDir);
+            TimeIndex timeIdx = createTimeIndexForSegmentMetadata(metadata, 
tpDir);
+            TransactionIndex txnIdx = 
createTxIndexForSegmentMetadata(metadata, tpDir);
+            maybeAppendIndexEntries(offsetIdx, timeIdx);
+            return switch (indexType) {
+                case OFFSET -> new FileInputStream(offsetIdx.file());
+                case TIMESTAMP -> new FileInputStream(timeIdx.file());
+                case TRANSACTION -> new FileInputStream(txnIdx.file());
+                case LEADER_EPOCH -> null; // leader-epoch-cache is not 
accessed.
+                case PRODUCER_SNAPSHOT -> null; // producer-snapshot is not 
accessed.
+            };
+        });
+        cache.getIndexEntry(rlsMetadata);
+        // rsm should not be called to fetch offset Index
+        verifyFetchIndexInvocation(0, List.of(IndexType.OFFSET));
+        verifyFetchIndexInvocation(1, List.of(IndexType.TIMESTAMP));
+        // Transaction index would be fetched again
+        // as previous getIndexEntry failed before fetchTransactionIndex
+        verifyFetchIndexInvocation(1, List.of(IndexType.TRANSACTION));
+    }
+
+    @Test
+    public void testIndexFileAlreadyExistOnDiskButNotInCache() throws 
InterruptedException, IOException, RemoteStorageException {
+        File remoteIndexCacheDir = cache.cacheDir();
+        String tempSuffix = ".tmptest";
+
+        RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata);
+        verifyFetchIndexInvocation(1);
+        // copy files with temporary name
+        Files.copy(entry.offsetIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", 
tempSuffix)));
+        Files.copy(entry.txnIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", 
tempSuffix)));
+        Files.copy(entry.timeIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", 
tempSuffix)));
+
+        cache.remove(rlsMetadata.remoteLogSegmentId().id());
+
+        // wait until entry is marked for deletion
+        TestUtils.waitForCondition(entry::isMarkedForCleanup,
+                "Failed to mark cache entry for cleanup after invalidation");
+        TestUtils.waitForCondition(entry::isCleanStarted,
+                "Failed to cleanup cache entry after invalidation");
+
+        // restore index files
+        renameRemoteCacheIndexFileFromDisk(tempSuffix, remoteIndexCacheDir, 
tempSuffix);
+        // validate cache entry for the above key should be null
+        
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()));
+        cache.getIndexEntry(rlsMetadata);
+        // Index  Files already exist ,rsm should not fetch them again.
+        verifyFetchIndexInvocation(1);
+        // verify index files on disk
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), "Offset index file should be 
present on disk at " + remoteIndexCacheDir.toPath());
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), "Txn index file should be 
present on disk at " + remoteIndexCacheDir.toPath());
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), "Time index file should be 
present on disk at " + remoteIndexCacheDir.toPath());
+    }
+
+    void renameRemoteCacheIndexFileFromDisk(String suffix, File 
remoteIndexCacheDir, String tempSuffix) throws IOException {
+        List<Path> paths = Files.walk(remoteIndexCacheDir.toPath())
+                .filter(Files::isRegularFile)
+                .filter(path -> 
path.getFileName().toString().endsWith(suffix)).toList();
+        for (Path path : paths) {
+            Utils.atomicMoveWithFallback(path, 
path.resolveSibling(path.getFileName().toString().replace(tempSuffix, "")));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = IndexType.class, names = {"OFFSET", "TIMESTAMP", 
"TRANSACTION"})
+    public void testRSMReturnCorruptedIndexFile(IndexType testIndexType) 
throws RemoteStorageException {
+        when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), 
any(IndexType.class))).thenAnswer(ans -> {
+            RemoteLogSegmentMetadata metadata = ans.getArgument(0);
+            IndexType indexType = ans.getArgument(1);
+            OffsetIndex offsetIdx = 
createOffsetIndexForSegmentMetadata(metadata, tpDir);
+            TimeIndex timeIdx = createTimeIndexForSegmentMetadata(metadata, 
tpDir);
+            TransactionIndex txnIdx = 
createTxIndexForSegmentMetadata(metadata, tpDir);
+            maybeAppendIndexEntries(offsetIdx, timeIdx);
+            // Create corrupt index file return from RSM
+            createCorruptedIndexFile(testIndexType, tpDir);

Review Comment:
   If I pass `indexType` then it's always `OFFSET`. I think this needs to be 
`testIndexType`, as it was in Scala, to trigger all the code paths. Granted 
this test is a bit convoluted and could probably be refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to