mimaison commented on code in PR #19469:
URL: https://github.com/apache/kafka/pull/19469#discussion_r2042607005


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java:
##########
@@ -0,0 +1,1207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.storage.internals.log.RemoteIndexCache.DIR_NAME;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFile;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFileName;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFile;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFileName;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFile;
+import static 
org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFileName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class RemoteIndexCacheTest {
+
+    private final long defaultRemoteIndexCacheSizeBytes = 1024 * 1024L;
+    private final Logger logger = 
LoggerFactory.getLogger(RemoteIndexCacheTest.class);
+    private final MockTime time = new MockTime();
+    private final int brokerId = 1;
+    private final long baseOffset = Integer.MAX_VALUE + 101337L; // start with 
a base offset which is a long
+    private final long lastOffset = baseOffset + 30L;
+    private final int segmentSize = 1024;
+    private final RemoteStorageManager rsm = mock(RemoteStorageManager.class);
+    private RemoteIndexCache cache;
+    private RemoteLogSegmentMetadata rlsMetadata;
+    private File logDir;
+    private File tpDir;
+    private TopicIdPartition idPartition;
+
+    @BeforeEach
+    public void setup() throws IOException, RemoteStorageException {
+        idPartition = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        logDir = TestUtils.tempDirectory("kafka-" + 
this.getClass().getSimpleName());
+        tpDir = new File(logDir, idPartition.toString());
+        Files.createDirectory(tpDir.toPath());
+
+        RemoteLogSegmentId remoteLogSegmentId = 
RemoteLogSegmentId.generateNew(idPartition);
+        rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), 
segmentSize, Collections.singletonMap(0, 0L));
+
+        cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, 
tpDir.toString());
+
+        mockRsmFetchIndex(rsm);
+    }
+
+    @AfterEach
+    public void cleanup() {
+        reset(rsm);
+        // the files created for the test will be deleted automatically on 
thread exit since we use temp dir
+        Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
+        // best effort to delete the per-test resource. Even if we don't 
delete, it is ok because the parent directory
+        // will be deleted at the end of test.
+        try {
+            Utils.delete(logDir);
+        } catch (IOException ioe) {
+            // ignore
+        }
+        // Verify no lingering threads. It is important to have this as the 
very last statement in the @AfterEach
+        // because this may throw an exception and prevent cleanup after it
+        
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,
 true);
+    }
+
+    @Test
+    public void testIndexFileNameAndLocationOnDisk() {
+        RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata);
+        Path offsetIndexFile = entry.offsetIndex().file().toPath();
+        Path txnIndexFile = entry.txnIndex().file().toPath();
+        Path timeIndexFile = entry.timeIndex().file().toPath();
+
+        String expectedOffsetIndexFileName = 
remoteOffsetIndexFileName(rlsMetadata);
+        String expectedTimeIndexFileName = 
remoteTimeIndexFileName(rlsMetadata);
+        String expectedTxnIndexFileName = 
remoteTransactionIndexFileName(rlsMetadata);
+
+        assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName().toString());
+        assertEquals(expectedTxnIndexFileName, 
txnIndexFile.getFileName().toString());
+        assertEquals(expectedTimeIndexFileName, 
timeIndexFile.getFileName().toString());
+
+        // assert that parent directory for the index files is correct
+        assertEquals(DIR_NAME, 
offsetIndexFile.getParent().getFileName().toString(),
+                "offsetIndex=" + offsetIndexFile + " is created under 
incorrect parent");
+        assertEquals(DIR_NAME, 
txnIndexFile.getParent().getFileName().toString(),
+                "txnIndex=" + txnIndexFile + " is created under incorrect 
parent");
+        assertEquals(DIR_NAME, 
timeIndexFile.getParent().getFileName().toString(),
+                "timeIndex=" + timeIndexFile + " is created under incorrect 
parent");
+    }
+
+    @Test
+    public void testFetchIndexFromRemoteStorage() throws 
RemoteStorageException {
+        OffsetIndex offsetIndex = 
cache.getIndexEntry(rlsMetadata).offsetIndex();
+        OffsetPosition offsetPosition1 = offsetIndex.entry(1);
+        // this call should have invoked fetchOffsetIndex, fetchTimestampIndex 
once
+        int resultPosition = cache.lookupOffset(rlsMetadata, 
offsetPosition1.offset);
+        assertEquals(offsetPosition1.position, resultPosition);
+        verifyFetchIndexInvocation(1, List.of(IndexType.OFFSET, 
IndexType.TIMESTAMP));
+
+        // this should not cause fetching index from RemoteStorageManager as 
it is already fetched earlier
+        reset(rsm);
+        OffsetPosition offsetPosition2 = offsetIndex.entry(2);
+        int resultPosition2 = cache.lookupOffset(rlsMetadata, 
offsetPosition2.offset);
+        assertEquals(offsetPosition2.position, resultPosition2);
+        assertNotNull(cache.getIndexEntry(rlsMetadata));
+        verifyNoInteractions(rsm);
+    }
+
+    @Test
+    public void testFetchIndexForMissingTransactionIndex() throws 
RemoteStorageException {
+        when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), 
any(IndexType.class))).thenAnswer(ans -> {
+            RemoteLogSegmentMetadata metadata = ans.getArgument(0);
+            IndexType indexType = ans.getArgument(1);
+            OffsetIndex offsetIdx = 
createOffsetIndexForSegmentMetadata(metadata, tpDir);
+            TimeIndex timeIdx = createTimeIndexForSegmentMetadata(metadata, 
tpDir);
+            maybeAppendIndexEntries(offsetIdx, timeIdx);
+            return switch (indexType) {
+                case OFFSET -> new FileInputStream(offsetIdx.file());
+                case TIMESTAMP -> new FileInputStream(timeIdx.file());
+                    // Throw RemoteResourceNotFoundException since transaction 
index is not available
+                case TRANSACTION -> throw new 
RemoteResourceNotFoundException("txn index not found");
+                case LEADER_EPOCH -> null; // leader-epoch-cache is not 
accessed.
+                case PRODUCER_SNAPSHOT -> null;  // producer-snapshot is not 
accessed.
+            };
+        });
+
+        RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata);
+        // Verify an empty file is created in the cache directory
+        assertTrue(entry.txnIndex().file().exists());
+        assertEquals(0, entry.txnIndex().file().length());
+    }
+
+    @Test
+    public void testPositionForNonExistingIndexFromRemoteStorage() {
+        OffsetIndex offsetIndex = 
cache.getIndexEntry(rlsMetadata).offsetIndex();
+        int lastOffsetPosition = cache.lookupOffset(rlsMetadata, 
offsetIndex.lastOffset());
+        long greaterOffsetThanLastOffset = offsetIndex.lastOffset() + 1;
+        assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, 
greaterOffsetThanLastOffset));
+
+        // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for 
offsets smaller than the last entry in the offset index.
+        OffsetPosition nonExistentOffsetPosition = new 
OffsetPosition(baseOffset, 0);
+        long lowerOffsetThanBaseOffset = offsetIndex.baseOffset() - 1;
+        assertEquals(nonExistentOffsetPosition.position, 
cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset));
+    }
+
+    @Test
+    public void testCacheEntryExpiry() throws IOException, 
RemoteStorageException, InterruptedException {
+        long estimateEntryBytesSize = estimateOneEntryBytesSize();
+        // close existing cache created in test setup before creating a new one
+        Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
+        cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, 
tpDir.toString());
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(3, tpId);
+
+        assertCacheSize(0);
+        // getIndex for first time will call rsm#fetchIndex
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        // Calling getIndex on the same entry should not call rsm#fetchIndex 
again, but it should retrieve from cache
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        verifyFetchIndexInvocation(1);
+
+        // Here a new key metadataList(1) is invoked, that should call 
rsm#fetchIndex, making the count to 2
+        cache.getIndexEntry(metadataList.get(0));
+        cache.getIndexEntry(metadataList.get(1));
+        assertCacheSize(2);
+        verifyFetchIndexInvocation(2);
+
+        // Getting index for metadataList.last should call rsm#fetchIndex
+        // to populate this entry one of the other 2 entries will be evicted. 
We don't know which one since it's based on
+        // a probabilistic formula for Window TinyLfu. See docs for 
RemoteIndexCache
+        int size = metadataList.size();
+        assertNotNull(cache.getIndexEntry(metadataList.get(size - 1)));
+        assertAtLeastOnePresent(cache, 
metadataList.get(1).remoteLogSegmentId().id(), 
metadataList.get(0).remoteLogSegmentId().id());
+        assertCacheSize(2);
+        verifyFetchIndexInvocation(3);
+
+        // getting index for last expired entry should call rsm#fetchIndex as 
that entry was expired earlier
+        Optional<RemoteLogSegmentMetadata> missingEntryOpt = Optional.empty();
+        for (RemoteLogSegmentMetadata entry : metadataList) {
+            Uuid segmentId = entry.remoteLogSegmentId().id();
+            if (!cache.internalCache().asMap().containsKey(segmentId)) {
+                missingEntryOpt = Optional.of(entry);
+                break;
+            }
+        }
+        assertFalse(missingEntryOpt.isEmpty());
+        cache.getIndexEntry(missingEntryOpt.get());
+        assertCacheSize(2);
+        verifyFetchIndexInvocation(4);
+    }
+
+    @Test
+    public void testGetIndexAfterCacheClose() throws IOException, 
RemoteStorageException, InterruptedException {
+        // close existing cache created in test setup before creating a new one
+        Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
+
+        cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, 
tpDir.toString());
+        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        List<RemoteLogSegmentMetadata> metadataList = 
generateRemoteLogSegmentMetadata(3, tpId);
+
+        assertCacheSize(0);
+        cache.getIndexEntry(metadataList.get(0));
+        assertCacheSize(1);
+        verifyFetchIndexInvocation(1);
+
+        cache.close();
+
+        // Check IllegalStateException is thrown when index is accessed after 
it is closed.
+        assertThrows(IllegalStateException.class, () -> 
cache.getIndexEntry(metadataList.get(0)));
+    }
+
+    @Test
+    public void testCloseIsIdempotent() throws IOException {
+        // generate and add entry to cache
+        RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
+        cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), 
spyEntry);
+
+        cache.close();
+        cache.close();
+
+        // verify that entry is only closed once
+        verify(spyEntry).close();
+    }
+
+    @Test
+    public void testCacheEntryIsDeletedOnRemoval() throws IOException, 
InterruptedException {
+        Uuid internalIndexKey = rlsMetadata.remoteLogSegmentId().id();
+        RemoteIndexCache.Entry cacheEntry = generateSpyCacheEntry();
+
+        // verify index files on disk
+        
assertTrue(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), 
"Offset index file should be present on disk at " + tpDir.toPath());
+        
assertTrue(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(),
 "Txn index file should be present on disk at " + tpDir.toPath());
+        
assertTrue(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(),
 "Time index file should be present on disk at " + tpDir.toPath());
+
+        // add the spied entry into the cache, it will overwrite the non-spied 
entry
+        cache.internalCache().put(internalIndexKey, cacheEntry);
+
+        // no expired entries yet
+        assertEquals(0, cache.expiredIndexes().size(), "expiredIndex queue 
should be zero at start of test");
+
+        // call remove function to mark the entry for removal
+        cache.remove(internalIndexKey);
+
+        // wait until entry is marked for deletion
+        TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup,
+                "Failed to mark cache entry for cleanup after invalidation");
+        TestUtils.waitForCondition(cacheEntry::isCleanStarted,
+                "Failed to cleanup cache entry after invalidation");
+
+        // first it will be marked for cleanup, second time markForCleanup is 
called when cleanup() is called
+        verify(cacheEntry, times(2)).markForCleanup();
+        // after that async it will be cleaned up
+        verify(cacheEntry).cleanup();
+
+        // verify that index(s) rename is only called 1 time
+        verify(cacheEntry.timeIndex()).renameTo(any(File.class));
+        verify(cacheEntry.offsetIndex()).renameTo(any(File.class));
+        verify(cacheEntry.txnIndex()).renameTo(any(File.class));
+
+        // verify no index files on disk
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent(),
+                "Offset index file should not be present on disk at " + 
tpDir.toPath());
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(),
+                "Txn index file should not be present on disk at " + 
tpDir.toPath());
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(),
+                "Time index file should not be present on disk at " + 
tpDir.toPath());
+        assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isPresent(),
+                "Index file marked for deletion should not be present on disk 
at " + tpDir.toPath());
+    }
+
+    private Optional<Path> getIndexFileFromDisk(String suffix) throws 
IOException {
+        return Files.walk(tpDir.toPath())
+                .filter(Files::isRegularFile)
+                .filter(path -> path.getFileName().toString().endsWith(suffix))
+                .findAny();
+    }
+
+    @Test
+    public void testCleanerThreadShutdown() throws IOException, 
InterruptedException {

Review Comment:
   The logic in this test is a bit different than the original. From my testing 
the original test did not actually threw the `RuntimeException` to validate the 
cleaner thread could handle it as `cleanup()` is effectively only called from 
`remove()` and not by invalidating the entry in the underlying cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to