mimaison commented on code in PR #19469: URL: https://github.com/apache/kafka/pull/19469#discussion_r2042607005
########## storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java: ########## @@ -0,0 +1,1207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.DIR_NAME; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFile; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteOffsetIndexFileName; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFile; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTimeIndexFileName; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFile; +import static org.apache.kafka.storage.internals.log.RemoteIndexCache.remoteTransactionIndexFileName; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class RemoteIndexCacheTest { + + private final long defaultRemoteIndexCacheSizeBytes = 1024 * 1024L; + private final Logger logger = LoggerFactory.getLogger(RemoteIndexCacheTest.class); + private final MockTime time = new MockTime(); + private final int brokerId = 1; + private final long baseOffset = Integer.MAX_VALUE + 101337L; // start with a base offset which is a long + private final long lastOffset = baseOffset + 30L; + private final int segmentSize = 1024; + private final RemoteStorageManager rsm = mock(RemoteStorageManager.class); + private RemoteIndexCache cache; + private RemoteLogSegmentMetadata rlsMetadata; + private File logDir; + private File tpDir; + private TopicIdPartition idPartition; + + @BeforeEach + public void setup() throws IOException, RemoteStorageException { + idPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + logDir = TestUtils.tempDirectory("kafka-" + this.getClass().getSimpleName()); + tpDir = new File(logDir, idPartition.toString()); + Files.createDirectory(tpDir.toPath()); + + RemoteLogSegmentId remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition); + rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)); + + cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString()); + + mockRsmFetchIndex(rsm); + } + + @AfterEach + public void cleanup() { + reset(rsm); + // the files created for the test will be deleted automatically on thread exit since we use temp dir + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); + // best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory + // will be deleted at the end of test. + try { + Utils.delete(logDir); + } catch (IOException ioe) { + // ignore + } + // Verify no lingering threads. It is important to have this as the very last statement in the @AfterEach + // because this may throw an exception and prevent cleanup after it + TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, true); + } + + @Test + public void testIndexFileNameAndLocationOnDisk() { + RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata); + Path offsetIndexFile = entry.offsetIndex().file().toPath(); + Path txnIndexFile = entry.txnIndex().file().toPath(); + Path timeIndexFile = entry.timeIndex().file().toPath(); + + String expectedOffsetIndexFileName = remoteOffsetIndexFileName(rlsMetadata); + String expectedTimeIndexFileName = remoteTimeIndexFileName(rlsMetadata); + String expectedTxnIndexFileName = remoteTransactionIndexFileName(rlsMetadata); + + assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName().toString()); + assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName().toString()); + assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName().toString()); + + // assert that parent directory for the index files is correct + assertEquals(DIR_NAME, offsetIndexFile.getParent().getFileName().toString(), + "offsetIndex=" + offsetIndexFile + " is created under incorrect parent"); + assertEquals(DIR_NAME, txnIndexFile.getParent().getFileName().toString(), + "txnIndex=" + txnIndexFile + " is created under incorrect parent"); + assertEquals(DIR_NAME, timeIndexFile.getParent().getFileName().toString(), + "timeIndex=" + timeIndexFile + " is created under incorrect parent"); + } + + @Test + public void testFetchIndexFromRemoteStorage() throws RemoteStorageException { + OffsetIndex offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex(); + OffsetPosition offsetPosition1 = offsetIndex.entry(1); + // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once + int resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset); + assertEquals(offsetPosition1.position, resultPosition); + verifyFetchIndexInvocation(1, List.of(IndexType.OFFSET, IndexType.TIMESTAMP)); + + // this should not cause fetching index from RemoteStorageManager as it is already fetched earlier + reset(rsm); + OffsetPosition offsetPosition2 = offsetIndex.entry(2); + int resultPosition2 = cache.lookupOffset(rlsMetadata, offsetPosition2.offset); + assertEquals(offsetPosition2.position, resultPosition2); + assertNotNull(cache.getIndexEntry(rlsMetadata)); + verifyNoInteractions(rsm); + } + + @Test + public void testFetchIndexForMissingTransactionIndex() throws RemoteStorageException { + when(rsm.fetchIndex(any(RemoteLogSegmentMetadata.class), any(IndexType.class))).thenAnswer(ans -> { + RemoteLogSegmentMetadata metadata = ans.getArgument(0); + IndexType indexType = ans.getArgument(1); + OffsetIndex offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir); + TimeIndex timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir); + maybeAppendIndexEntries(offsetIdx, timeIdx); + return switch (indexType) { + case OFFSET -> new FileInputStream(offsetIdx.file()); + case TIMESTAMP -> new FileInputStream(timeIdx.file()); + // Throw RemoteResourceNotFoundException since transaction index is not available + case TRANSACTION -> throw new RemoteResourceNotFoundException("txn index not found"); + case LEADER_EPOCH -> null; // leader-epoch-cache is not accessed. + case PRODUCER_SNAPSHOT -> null; // producer-snapshot is not accessed. + }; + }); + + RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata); + // Verify an empty file is created in the cache directory + assertTrue(entry.txnIndex().file().exists()); + assertEquals(0, entry.txnIndex().file().length()); + } + + @Test + public void testPositionForNonExistingIndexFromRemoteStorage() { + OffsetIndex offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex(); + int lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset()); + long greaterOffsetThanLastOffset = offsetIndex.lastOffset() + 1; + assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset)); + + // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for offsets smaller than the last entry in the offset index. + OffsetPosition nonExistentOffsetPosition = new OffsetPosition(baseOffset, 0); + long lowerOffsetThanBaseOffset = offsetIndex.baseOffset() - 1; + assertEquals(nonExistentOffsetPosition.position, cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset)); + } + + @Test + public void testCacheEntryExpiry() throws IOException, RemoteStorageException, InterruptedException { + long estimateEntryBytesSize = estimateOneEntryBytesSize(); + // close existing cache created in test setup before creating a new one + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); + cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString()); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId); + + assertCacheSize(0); + // getIndex for first time will call rsm#fetchIndex + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + verifyFetchIndexInvocation(1); + + // Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2 + cache.getIndexEntry(metadataList.get(0)); + cache.getIndexEntry(metadataList.get(1)); + assertCacheSize(2); + verifyFetchIndexInvocation(2); + + // Getting index for metadataList.last should call rsm#fetchIndex + // to populate this entry one of the other 2 entries will be evicted. We don't know which one since it's based on + // a probabilistic formula for Window TinyLfu. See docs for RemoteIndexCache + int size = metadataList.size(); + assertNotNull(cache.getIndexEntry(metadataList.get(size - 1))); + assertAtLeastOnePresent(cache, metadataList.get(1).remoteLogSegmentId().id(), metadataList.get(0).remoteLogSegmentId().id()); + assertCacheSize(2); + verifyFetchIndexInvocation(3); + + // getting index for last expired entry should call rsm#fetchIndex as that entry was expired earlier + Optional<RemoteLogSegmentMetadata> missingEntryOpt = Optional.empty(); + for (RemoteLogSegmentMetadata entry : metadataList) { + Uuid segmentId = entry.remoteLogSegmentId().id(); + if (!cache.internalCache().asMap().containsKey(segmentId)) { + missingEntryOpt = Optional.of(entry); + break; + } + } + assertFalse(missingEntryOpt.isEmpty()); + cache.getIndexEntry(missingEntryOpt.get()); + assertCacheSize(2); + verifyFetchIndexInvocation(4); + } + + @Test + public void testGetIndexAfterCacheClose() throws IOException, RemoteStorageException, InterruptedException { + // close existing cache created in test setup before creating a new one + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); + + cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString()); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId); + + assertCacheSize(0); + cache.getIndexEntry(metadataList.get(0)); + assertCacheSize(1); + verifyFetchIndexInvocation(1); + + cache.close(); + + // Check IllegalStateException is thrown when index is accessed after it is closed. + assertThrows(IllegalStateException.class, () -> cache.getIndexEntry(metadataList.get(0))); + } + + @Test + public void testCloseIsIdempotent() throws IOException { + // generate and add entry to cache + RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry(); + cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), spyEntry); + + cache.close(); + cache.close(); + + // verify that entry is only closed once + verify(spyEntry).close(); + } + + @Test + public void testCacheEntryIsDeletedOnRemoval() throws IOException, InterruptedException { + Uuid internalIndexKey = rlsMetadata.remoteLogSegmentId().id(); + RemoteIndexCache.Entry cacheEntry = generateSpyCacheEntry(); + + // verify index files on disk + assertTrue(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), "Offset index file should be present on disk at " + tpDir.toPath()); + assertTrue(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), "Txn index file should be present on disk at " + tpDir.toPath()); + assertTrue(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), "Time index file should be present on disk at " + tpDir.toPath()); + + // add the spied entry into the cache, it will overwrite the non-spied entry + cache.internalCache().put(internalIndexKey, cacheEntry); + + // no expired entries yet + assertEquals(0, cache.expiredIndexes().size(), "expiredIndex queue should be zero at start of test"); + + // call remove function to mark the entry for removal + cache.remove(internalIndexKey); + + // wait until entry is marked for deletion + TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation"); + TestUtils.waitForCondition(cacheEntry::isCleanStarted, + "Failed to cleanup cache entry after invalidation"); + + // first it will be marked for cleanup, second time markForCleanup is called when cleanup() is called + verify(cacheEntry, times(2)).markForCleanup(); + // after that async it will be cleaned up + verify(cacheEntry).cleanup(); + + // verify that index(s) rename is only called 1 time + verify(cacheEntry.timeIndex()).renameTo(any(File.class)); + verify(cacheEntry.offsetIndex()).renameTo(any(File.class)); + verify(cacheEntry.txnIndex()).renameTo(any(File.class)); + + // verify no index files on disk + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), + "Offset index file should not be present on disk at " + tpDir.toPath()); + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), + "Txn index file should not be present on disk at " + tpDir.toPath()); + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), + "Time index file should not be present on disk at " + tpDir.toPath()); + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent(), + "Index file marked for deletion should not be present on disk at " + tpDir.toPath()); + } + + private Optional<Path> getIndexFileFromDisk(String suffix) throws IOException { + return Files.walk(tpDir.toPath()) + .filter(Files::isRegularFile) + .filter(path -> path.getFileName().toString().endsWith(suffix)) + .findAny(); + } + + @Test + public void testCleanerThreadShutdown() throws IOException, InterruptedException { Review Comment: The logic in this test is a bit different than the original. From my testing the original test did not actually threw the `RuntimeException` to validate the cleaner thread could handle it as `cleanup()` is effectively only called from `remove()` and not by invalidating the entry in the underlying cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org