showuon commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1357702660
########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ########## @@ -137,13 +137,10 @@ public RemoteIndexCache(long maxSize, RemoteStorageManager remoteStorageManager, public void resizeCacheSize(long remoteLogIndexFileCacheSize) { lock.writeLock().lock(); try { - // When resizing the cache, we always start with an empty cache. There are two main reasons: - // 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old + // When resizing the cache, we always start with an empty cache. + // Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old // cache to the new cache in time when resizing inside. - // 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry - // in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches - // will be inconsistent. - internalCache.invalidateAll(); + removeAll(internalCache.asMap().keySet()); Review Comment: OK, we can keep this change, there will be update at this: https://github.com/apache/kafka/pull/14511 . cc @hudeqi ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -592,16 +593,73 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { + // Create a spy Cache Entry + val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + + val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) + cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + + assertCacheSize(1) + + val latchForCacheRead = new CountDownLatch(1) + val latchForCacheRemove = new CountDownLatch(1) + val latchForTestWait = new CountDownLatch(1) + + doAnswer((invocation: InvocationOnMock) => { + // Signal the CacheRead to unblock itself + latchForCacheRead.countDown() + // Wait for signal to start renaming the files + latchForCacheRemove.await() + // Calling the markForCleanup() actual method to start renaming the files + invocation.callRealMethod() + // Signal TestWait to unblock itself so that test can be completed + latchForTestWait.countDown() + }).when(spyEntry).markForCleanup() + + val removeCache = (() => { + cache.remove(rlsMetadata.remoteLogSegmentId().id()) + }): Runnable + + val readCache = (() => { + // Wait for signal to start CacheRead + latchForCacheRead.await() + cache.getIndexEntry(rlsMetadata) + // Signal the CacheRemove to start renaming the files + latchForCacheRemove.countDown() + }): Runnable + + val executor = Executors.newFixedThreadPool(2) + try { + executor.submit(removeCache: Runnable) + executor.submit(readCache: Runnable) + + // Wait for signal to complete the test + latchForTestWait.await() + val entry = cache.getIndexEntry(rlsMetadata) + assertTrue(Files.exists(entry.offsetIndex().file().toPath)) Review Comment: We need to verify something between L644 and L645, that is, after 2 threads completed, what state we're expecting? My understanding is,`spyEntry.markForCleanup` will get invoked before `cache.getIndexEntry` for the same entry, that means, `entry update for getIndexEntry` needs to wait until `entry remove` completed, since they are both atomic operation. In this case, after the L644, the new entry should existed in cache and storage. So, we don't actually need L645 to `getIndexEntry`. Is my understanding correct? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ########## @@ -157,9 +154,9 @@ private Cache<Uuid, Entry> initEmptyCache(long maxSize) { .weigher((Uuid key, Entry entry) -> { return (int) entry.entrySizeBytes; }) - // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or + // evictionListener is invoked when either the entry is invalidated (means manual removal by the caller) or // evicted (means removal due to the policy) Review Comment: evictionListener will be invoked when evicted, as [the doc](https://github.com/ben-manes/caffeine/wiki/Removal) said: > When the operation must be performed synchronously with eviction, use Caffeine.evictionListener(RemovalListener) instead. This listener will only be notified when RemovalCause.wasEvicted() is true. For an explicit removal, Cache.asMap() offers compute methods that are performed atomically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org