mimaison commented on code in PR #19469: URL: https://github.com/apache/kafka/pull/19469#discussion_r2042785641
########## storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java: ########## @@ -0,0 +1,1207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.DIR_NAME; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFile; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFileName; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFile; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFileName; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFile; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFileName; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class RemoteIndexCacheTest { + + private final long defaultRemoteIndexCacheSizeBytes = 1024 * 1024L; + private final Logger logger = LoggerFactory.getLogger(RemoteIndexCacheTest.class); + private final MockTime time = new MockTime(); + private final int brokerId = 1; + private final long baseOffset = Integer.MAX_VALUE + 101337L; // start with a base offset which is a long + private final long lastOffset = baseOffset + 30L; + private final int segmentSize = 1024; + private final RemoteStorageManager rsm = mock(RemoteStorageManager.class); + private RemoteIndexCache cache; + private RemoteLogSegmentMetadata rlsMetadata; + private File logDir; + private File tpDir; + private TopicIdPartition idPartition; + + @BeforeEach + public void setup() throws IOException, RemoteStorageException { + idPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + logDir = TestUtils.tempDirectory("kafka-" + this.getClass().getSimpleName()); + tpDir = new File(logDir, idPartition.toString()); + Files.createDirectory(tpDir.toPath()); + + RemoteLogSegmentId remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition); + rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)); + + cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString()); + + mockRsmFetchIndex(rsm); + } + + @AfterEach + public void cleanup() { + reset(rsm); + // the files created for the test will be deleted automatically on thread exit since we use temp dir + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); + // best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory + // will be deleted at the end of test. + try { + Utils.delete(logDir); + } catch (IOException ioe) { + // ignore + } + // Verify no lingering threads. It is important to have this as the very last statement in the @AfterEach + // because this may throw an exception and prevent cleanup after it + TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, true); + } + + @Test + public void testIndexFileNameAndLocationOnDisk() { + RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata); + Path offsetIndexFile = entry.offsetIndex().file().toPath(); + Path txnIndexFile = entry.txnIndex().file().toPath(); + Path timeIndexFile = entry.timeIndex().file().toPath(); + + String expectedOffsetIndexFileName = remoteOffsetIndexFileName(rlsMetadata); + String expectedTimeIndexFileName = remoteTimeIndexFileName(rlsMetadata); + String expectedTxnIndexFileName = remoteTransactionIndexFileName(rlsMetadata); + + assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName().toString()); + assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName().toString()); + assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName().toString()); + + // assert that parent directory for the index files is correct + assertEquals(DIR_NAME, offsetIndexFile.getParent().getFileName().toString(), + "offsetIndex=" + offsetIndexFile + " is created under incorrect parent"); + assertEquals(DIR_NAME, txnIndexFile.getParent().getFileName().toString(), + "txnIndex=" + txnIndexFile + " is created under incorrect parent"); + assertEquals(DIR_NAME, timeIndexFile.getParent().getFileName().toString(), + "timeIndex=" + timeIndexFile + " is created under incorrect parent"); + } + + @Test + public void testFetchIndexFromRemoteStorage() throws RemoteStorageException { + OffsetIndex offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex(); + OffsetPosition offsetPosition1 = offsetIndex.entry(1); + // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once + int resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset); + assertEquals(offsetPosition1.position, resultPosition); + verifyFetchIndexInvocation(1, List.of(IndexType.OFFSET, IndexType.TIMESTAMP)); + + // this should not cause fetching index from RemoteStorageManager as it is already fetched earlier + reset(rsm); + OffsetPosition offsetPosition2 = offsetIndex.entry(2); + int resultPosition2 = cache.lookupOffset(rlsMetadata, offsetPosition2.offset); + assertEquals(offsetPosition2.position, resultPosition2); + assertNotNull(cache.getIndexEntry(rlsMetadata)); + verifyNoInteractions(rsm); + } + + @Test + public void testFetchIndexForMissingTransactionIndex() throws RemoteStorageException { + when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), any(IndexType.class))).thenAnswer(ans -> { + RemoteLogSegmentMetadata metadata = ans.getArgument(0); + IndexType indexType = ans.getArgument(1); + OffsetIndex offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir); + TimeIndex timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir); + maybeAppendIndexEntries(offsetIdx, timeIdx); + return switch (indexType) { + case OFFSET -> new FileInputStream(offsetIdx.file()); + case TIMESTAMP -> new FileInputStream(timeIdx.file()); + // Throw RemoteResourceNotFoundException since transaction index is not available + case TRANSACTION -> throw new RemoteResourceNotFoundException("txn index not found"); + case LEADER_EPOCH -> null; // leader-epoch-cache is not accessed. + case PRODUCER_SNAPSHOT -> null; // producer-snapshot is not accessed. + }; + }); + + RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata); + // Verify an empty file is created in the cache directory + assertTrue(entry.txnIndex().file().exists()); + assertEquals(0, entry.txnIndex().file().length()); + } + + @Test + public void testPositionForNonExistingIndexFromRemoteStorage() { + OffsetIndex offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex(); + int lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset()); + long greaterOffsetThanLastOffset = offsetIndex.lastOffset() + 1; + assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset)); + + // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for offsets smaller than the last entry in the offset index. + OffsetPosition nonExistentOffsetPosition = new OffsetPosition(baseOffset, 0); + long lowerOffsetThanBaseOffset = offsetIndex.baseOffset() - 1; + assertEquals(nonExistentOffsetPosition.position, cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset)); + } + + @Test + public void testCacheEntryExpiry() throws IOException, RemoteStorageException, InterruptedException { + long estimateEntryBytesSize = estimateOneEntryBytesSize(); + // close existing cache created in test setup before creating a new one + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); + cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString()); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId); + + assertCacheSize(0); + // getIndex for first time will call rsm#fetchIndex + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + verifyFetchIndexInvocation(1); + + // Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2 + cache.getIndexEntry(metadataList.get(0)); + cache.getIndexEntry(metadataList.get(1)); + assertCacheSize(2); + verifyFetchIndexInvocation(2); + + // Getting index for metadataList.last should call rsm#fetchIndex + // to populate this entry one of the other 2 entries will be evicted. We don't know which one since it's based on + // a probabilistic formula for Window TinyLfu. See docs for RemoteIndexCache + int size = metadataList.size(); + assertNotNull(cache.getIndexEntry(metadataList.get(size - 1))); + assertAtLeastOnePresent(cache, metadataList.get(1).remoteLogSegmentId().id(), metadataList.get(0).remoteLogSegmentId().id()); + assertCacheSize(2); + verifyFetchIndexInvocation(3); + + // getting index for last expired entry should call rsm#fetchIndex as that entry was expired earlier + Optional<RemoteLogSegmentMetadata> missingEntryOpt = Optional.empty(); + for (RemoteLogSegmentMetadata entry : metadataList) { + Uuid segmentId = entry.remoteLogSegmentId().id(); + if (!cache.internalCache().asMap().containsKey(segmentId)) { + missingEntryOpt = Optional.of(entry); + break; + } + } + assertFalse(missingEntryOpt.isEmpty()); + cache.getIndexEntry(missingEntryOpt.get()); + assertCacheSize(2); + verifyFetchIndexInvocation(4); + } + + @Test + public void testGetIndexAfterCacheClose() throws IOException, RemoteStorageException, InterruptedException { + // close existing cache created in test setup before creating a new one + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); + + cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString()); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId); + + assertCacheSize(0); + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + verifyFetchIndexInvocation(1); + + cache.close(); + + // Check IllegalStateException is thrown when index is accessed after it is closed. + assertThrows(IllegalStateException.class, () -> cache.getIndexEntry(metadataList.get(0))); + } + + @Test + public void testCloseIsIdempotent() throws IOException { + // generate and add entry to cache + RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(); + cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), spyEntry); + + cache.close(); + cache.close(); + + // verify that entry is only closed once + verify(spyEntry).close(); + } + + @Test + public void testCacheEntryIsDeletedOnRemoval() throws IOException, InterruptedException { + Uuid internalIndexKey = rlsMetadata.remoteLogSegmentId().id(); + RemoteIndexCache.Entry cacheEntry = generateSpyCacheEntry(); + + // verify index files on disk + assertTrue(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), "Offset index file should be present on disk at " + tpDir.toPath()); + assertTrue(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), "Txn index file should be present on disk at " + tpDir.toPath()); + assertTrue(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), "Time index file should be present on disk at " + tpDir.toPath()); + + // add the spied entry into the cache, it will overwrite the non-spied entry + cache.internalCache().put(internalIndexKey, cacheEntry); + + // no expired entries yet + assertEquals(0, cache.expiredIndexes().size(), "expiredIndex queue should be zero at start of test"); + + // call remove function to mark the entry for removal + cache.remove(internalIndexKey); + + // wait until entry is marked for deletion + TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation"); + TestUtils.waitForCondition(cacheEntry::isCleanStarted, + "Failed to cleanup cache entry after invalidation"); + + // first it will be marked for cleanup, second time markForCleanup is called when cleanup() is called + verify(cacheEntry, times(2)).markForCleanup(); + // after that async it will be cleaned up + verify(cacheEntry).cleanup(); + + // verify that index(s) rename is only called 1 time + verify(cacheEntry.timeIndex()).renameTo(any(File.class)); + verify(cacheEntry.offsetIndex()).renameTo(any(File.class)); + verify(cacheEntry.txnIndex()).renameTo(any(File.class)); + + // verify no index files on disk + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), + "Offset index file should not be present on disk at " + tpDir.toPath()); + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), + "Txn index file should not be present on disk at " + tpDir.toPath()); + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), + "Time index file should not be present on disk at " + tpDir.toPath()); + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent(), + "Index file marked for deletion should not be present on disk at " + tpDir.toPath()); + } + + private Optional<Path> getIndexFileFromDisk(String suffix) throws IOException { + return Files.walk(tpDir.toPath()) + .filter(Files::isRegularFile) + .filter(path -> path.getFileName().toString().endsWith(suffix)) + .findAny(); + } + + @Test + public void testCleanerThreadShutdown() throws IOException, InterruptedException { + // cache is empty at beginning + assertTrue(cache.internalCache().asMap().isEmpty()); + // verify that cleaner thread is running + Set<Thread> threads = getRunningCleanerThread(); + assertEquals(1, threads.size(), + "Found unexpected " + threads.size() + " threads=" + threads.stream().map(Thread::getName).collect(Collectors.joining(", "))); + // create a new entry + RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(); + // an exception should not close the cleaner thread + doThrow(new RuntimeException("kaboom! I am expected exception in unit test.")).when(spyEntry).cleanup(); + Uuid key = Uuid.randomUuid(); + cache.internalCache().put(key, spyEntry); + // trigger cleanup + cache.remove(key); + // Give the thread cleaner thread some time to throw an exception + Thread.sleep(100); + verify(spyEntry, times(1)).cleanup(); + // Verify that Cleaner thread is still running even when exception is thrown in doWork() + threads = getRunningCleanerThread(); + assertEquals(1, threads.size(), + "Found unexpected " + threads.size() + " threads=" + threads.stream().map(Thread::getName).collect(Collectors.joining(", "))); + // close the cache properly + cache.close(); + // verify that the thread is closed properly + threads = getRunningCleanerThread(); + assertTrue(threads.isEmpty(), "Found unexpected " + threads.size() + " threads=" + threads.stream().map(Thread::getName).collect(Collectors.joining(", "))); + // if the thread is correctly being shutdown it will not be running + assertFalse(cache.cleanerThread().isRunning(), "Unexpected thread state=running. Check error logs."); + } + + @Test + public void testClose() throws IOException, InterruptedException { + RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(); + cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), spyEntry); + + TestUtils.waitForCondition(() -> cache.cleanerThread().isStarted(), "Cleaner thread should be started"); + + // close the cache + cache.close(); + + // closing the cache should close the entry + verify(spyEntry).close(); + + // close for all index entries must be invoked + verify(spyEntry.txnIndex()).close(); + verify(spyEntry.offsetIndex()).close(); + verify(spyEntry.timeIndex()).close(); + + // index files must not be deleted + verify(spyEntry.txnIndex(), times(0)).deleteIfExists(); + verify(spyEntry.offsetIndex(), times(0)).deleteIfExists(); + verify(spyEntry.timeIndex(), times(0)).deleteIfExists(); + + // verify cleaner thread is shutdown + assertTrue(cache.cleanerThread().isShutdownComplete()); + } + + @Test + public void testConcurrentReadWriteAccessForCache() throws InterruptedException, RemoteStorageException { + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId); + + assertCacheSize(0); + // getIndex for first time will call rsm#fetchIndex + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + verifyFetchIndexInvocation(1, List.of(IndexType.OFFSET, IndexType.TIMESTAMP)); + reset(rsm); + + // Simulate a concurrency situation where one thread is reading the entry already present in the cache (cache hit) + // and the other thread is reading an entry which is not available in the cache (cache miss). The expected behaviour + // is for the former thread to succeed while latter is fetching from rsm. + // In this test we simulate the situation using latches. We perform the following operations: + // 1. Start the CacheMiss thread and wait until it starts executing the rsm.fetchIndex + // 2. Block the CacheMiss thread inside the call to rsm.fetchIndex. + // 3. Start the CacheHit thread. Assert that it performs a successful read. + // 4. On completion of successful read by CacheHit thread, signal the CacheMiss thread to release its block. + // 5. Validate that the test passes. If the CacheMiss thread was blocking the CacheHit thread, the test will fail. + CountDownLatch latchForCacheHit = new CountDownLatch(1); + CountDownLatch latchForCacheMiss = new CountDownLatch(1); + + Runnable readerCacheHit = () -> { + // Wait for signal to start executing the read + logger.debug("Waiting for signal to begin read from {}", Thread.currentThread()); + try { + latchForCacheHit.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + RemoteIndexCache.Entry entry = cache.getIndexEntry(metadataList.get(0)); + assertNotNull(entry); + // Signal the CacheMiss to unblock itself + logger.debug("Signaling CacheMiss to unblock from {}", Thread.currentThread()); + latchForCacheMiss.countDown(); + }; + + when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), any(IndexType.class))).thenAnswer(answer -> { + logger.debug("Signaling CacheHit to begin read from {}", Thread.currentThread()); + latchForCacheHit.countDown(); + logger.debug("Waiting for signal to complete rsm fetch from {}", Thread.currentThread()); + latchForCacheMiss.await(); + return null; + }); + + Runnable readerCacheMiss = () -> { + int size = metadataList.size(); + RemoteIndexCache.Entry entry = cache.getIndexEntry(metadataList.get(size - 1)); + assertNotNull(entry); + }; + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + executor.submit(readerCacheMiss); + executor.submit(readerCacheHit); + assertTrue(latchForCacheMiss.await(30, TimeUnit.SECONDS)); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testReloadCacheAfterClose() throws IOException, RemoteStorageException, InterruptedException { + long estimateEntryBytesSize = estimateOneEntryBytesSize(); + // close existing cache created in test setup before creating a new one + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); + cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString()); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId); + + assertCacheSize(0); + // getIndex for first time will call rsm#fetchIndex + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + verifyFetchIndexInvocation(1); + + // Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2 + cache.getIndexEntry(metadataList.get(1)); + assertCacheSize(2); + // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache + cache.getIndexEntry(metadataList.get(1)); + assertCacheSize(2); + verifyFetchIndexInvocation(2); + + // Here a new key metadataList(2) is invoked, that should call rsm#fetchIndex + // The cache max size is 2, it will remove one entry and keep the overall size to 2 + cache.getIndexEntry(metadataList.get(2)); + assertCacheSize(2); + // Calling getIndex on the same entry may call rsm#fetchIndex or not, it depends on the cache implementation so + // we only need to verify the number of calling is in our range. + cache.getIndexEntry(metadataList.get(2)); + assertCacheSize(2); + verifyFetchIndexInvocationWithRange(3, 4); + + // Close the cache + cache.close(); + + // Reload the cache from the disk and check the cache size is same as earlier + RemoteIndexCache reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString()); + assertEquals(2, reloadedCache.internalCache().asMap().size()); + reloadedCache.close(); + + verifyNoMoreInteractions(rsm); + } + + @Test + public void testRemoveItem() throws InterruptedException, IOException { + RemoteLogSegmentId segmentId = rlsMetadata.remoteLogSegmentId(); + Uuid segmentUuid = segmentId.id(); + // generate and add entry to cache + RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(segmentId); + cache.internalCache().put(segmentUuid, spyEntry); + assertTrue(cache.internalCache().asMap().containsKey(segmentUuid)); + assertFalse(spyEntry.isMarkedForCleanup()); + + cache.remove(segmentId.id()); + assertFalse(cache.internalCache().asMap().containsKey(segmentUuid)); + TestUtils.waitForCondition(spyEntry::isMarkedForCleanup, "Failed to mark cache entry for cleanup after invalidation"); + } + + @Test + public void testRemoveNonExistentItem() throws IOException { + // generate and add entry to cache + RemoteLogSegmentId segmentId = rlsMetadata.remoteLogSegmentId(); + Uuid segmentUuid = segmentId.id(); + // generate and add entry to cache + RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(segmentId); + cache.internalCache().put(segmentUuid, spyEntry); + assertTrue(cache.internalCache().asMap().containsKey(segmentUuid)); + + // remove a random Uuid + cache.remove(Uuid.randomUuid()); + assertTrue(cache.internalCache().asMap().containsKey(segmentUuid)); + assertFalse(spyEntry.isMarkedForCleanup()); + } + + @Test + public void testRemoveMultipleItems() throws InterruptedException, IOException { + // generate and add entry to cache + Map<Uuid, RemoteIndexCache.Entry> uuidAndEntryList = new HashMap<>(); + for (int i = 0; i < 10; i++) { + RemoteLogSegmentId segmentId = RemoteLogSegmentId.generateNew(idPartition); + Uuid segmentUuid = segmentId.id(); + RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(segmentId); + uuidAndEntryList.put(segmentUuid, spyEntry); + + cache.internalCache().put(segmentUuid, spyEntry); + assertTrue(cache.internalCache().asMap().containsKey(segmentUuid)); + assertFalse(spyEntry.isMarkedForCleanup()); + } + cache.removeAll(uuidAndEntryList.keySet()); + for (RemoteIndexCache.Entry entry : uuidAndEntryList.values()) { + TestUtils.waitForCondition(entry::isMarkedForCleanup, "Failed to mark cache entry for cleanup after invalidation"); + } + } + + @Test + public void testClearCacheAndIndexFilesWhenResizeCache() throws InterruptedException { + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(1, tpId); + + assertCacheSize(0); + // getIndex for first time will call rsm#fetchIndex + RemoteIndexCache.Entry cacheEntry = cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent()); + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent()); + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent()); + + cache.resizeCacheSize(1L); + + // wait until entry is marked for deletion + TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache."); + TestUtils.waitForCondition(cacheEntry::isCleanStarted, + "Failed to cleanup cache entry after resizing cache."); + + // verify no index files on remote cache dir + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(), + "Offset index file should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(), + "Txn index file should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(), + "Time index file should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(), + "Index file marked for deletion should not be present on disk at " + cache.cacheDir()); + + assertCacheSize(0); + } + + @Test + public void testCorrectnessForCacheAndIndexFilesWhenResizeCache() throws IOException, InterruptedException, RemoteStorageException { + // The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> + // evict to 1 entry -> resize to 1 entry size -> resize to 2 entries size + long estimateEntryBytesSize = estimateOneEntryBytesSize(); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId); + + assertCacheSize(0); + // getIndex for first time will call rsm#fetchIndex + RemoteIndexCache.Entry cacheEntry = cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent()); + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent()); + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent()); + + // Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. + cache.resizeCacheSize(1L); + + // wait until entry is marked for deletion + TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache."); + TestUtils.waitForCondition(cacheEntry::isCleanStarted, + "Failed to cleanup cache entry after resizing cache."); + + // verify no index files on remote cache dir + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(), + "Offset index file should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(), + "Txn index file should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(), + "Time index file should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(), + "Index file marked for deletion should not be present on disk at " + cache.cacheDir()); + + assertCacheSize(0); + + // Increase cache capacity to only store 2 entries + cache.resizeCacheSize(2 * estimateEntryBytesSize); + assertCacheSize(0); + + RemoteIndexCache.Entry entry0 = cache.getIndexEntry(metadataList.get(0)); + RemoteIndexCache.Entry entry1 = cache.getIndexEntry(metadataList.get(1)); + RemoteIndexCache.Entry entry2 = cache.getIndexEntry(metadataList.get(2)); + List<RemoteIndexCache.Entry> entries = List.of(entry0, entry1, entry2); + assertCacheSize(2); + EvictionResult result = verifyEntryIsEvicted(metadataList, entries, 1); + + // Reduce cache capacity to only store 1 entry + cache.resizeCacheSize(1 * estimateEntryBytesSize); + assertCacheSize(1); + // After resize, we need to check an entry is deleted from cache and the existing segmentMetadata + List<RemoteIndexCache.Entry> entryInCache = entries.stream().filter(e -> !result.evictedEntries.contains(e)).toList(); + List<RemoteLogSegmentMetadata> updatedSegmentMetadata = metadataList.stream().filter(e -> !result.evictedSegmentMetadata.contains(e)).toList(); + verifyEntryIsEvicted(updatedSegmentMetadata, entryInCache, 1); + + // resize to the same size, all entries should be kept + cache.resizeCacheSize(1 * estimateEntryBytesSize); + + List<RemoteLogSegmentMetadata> entriesKept = getRemoteLogSegMetadataIsKept(metadataList); + // verify all existing entries (`cache.getIndexEntry(metadataList(2))`) are kept + verifyEntryIsKept(entriesKept); + assertCacheSize(1); + + // increase the size + cache.resizeCacheSize(2 * estimateEntryBytesSize); + + // verify all entries are kept + verifyEntryIsKept(entriesKept); + assertCacheSize(1); + } + + private List<RemoteLogSegmentMetadata> getRemoteLogSegMetadataIsKept(List<RemoteLogSegmentMetadata> metadataToVerify) { + return metadataToVerify + .stream() + .filter(s -> cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id())) + .toList(); + } + + record EvictionResult(List<RemoteLogSegmentMetadata> evictedSegmentMetadata, List<RemoteIndexCache.Entry> evictedEntries) { } + + private EvictionResult verifyEntryIsEvicted(List<RemoteLogSegmentMetadata> metadataToVerify, List<RemoteIndexCache.Entry> entriesToVerify, int numOfMarkAsDeleted) throws InterruptedException { + TestUtils.waitForCondition(() -> entriesToVerify.stream().filter(RemoteIndexCache.Entry::isMarkedForCleanup).count() == numOfMarkAsDeleted, + "Failed to mark evicted cache entry for cleanup after resizing cache."); + + TestUtils.waitForCondition(() -> entriesToVerify.stream().filter(RemoteIndexCache.Entry::isCleanStarted).count() == numOfMarkAsDeleted, + "Failed to cleanup evicted cache entry after resizing cache."); + + List<RemoteIndexCache.Entry> entriesIsMarkedForCleanup = entriesToVerify.stream().filter(RemoteIndexCache.Entry::isMarkedForCleanup).toList(); + List<RemoteIndexCache.Entry> entriesIsCleanStarted = entriesToVerify.stream().filter(RemoteIndexCache.Entry::isCleanStarted).toList(); + // clean up entries and clean start entries should be the same + assertEquals(entriesIsMarkedForCleanup, entriesIsCleanStarted); + + // get the logSegMetadata are evicted + List<RemoteLogSegmentMetadata> metadataDeleted = metadataToVerify + .stream() + .filter(s -> !cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id())) + .toList(); + assertEquals(numOfMarkAsDeleted, metadataDeleted.size()); + for (RemoteLogSegmentMetadata metadata : metadataDeleted) { + // verify no index files for `entryToVerify` on remote cache dir + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadata)).isEmpty(), + "Offset index file for evicted entry should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadata)).isEmpty(), + "Time index file for evicted entry should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadata)).isEmpty(), + "Txn index file for evicted entry should not be present on disk at " + cache.cacheDir()); + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(), + "Index file marked for deletion for evicted entry should not be present on disk at " + cache.cacheDir()); + } + return new EvictionResult(metadataDeleted, entriesIsMarkedForCleanup); + } + + private void verifyEntryIsEvicted(RemoteLogSegmentMetadata metadataToVerify, RemoteIndexCache.Entry entryToVerify) throws InterruptedException { Review Comment: It looks like I messed up while rebasing. I had started working on this weeks ago and the method was updated in the meantime and it looks like I kept both. Removing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org