satishd commented on code in PR #19462: URL: https://github.com/apache/kafka/pull/19462#discussion_r2048400153
########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ########## @@ -208,46 +203,32 @@ public void removeAll(Collection<Uuid> keys) { } } - private void enqueueEntryForCleanup(Entry entry, Uuid key) { + private void enqueueEntryForCleanup(Entry entry) { try { entry.markForCleanup(); - if (!expiredIndexes.offer(entry)) { - log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key); - } + Runnable runnable = () -> { + try { + log.debug("Cleaning up index entry {}", entry); + entry.cleanup(); + } catch (Exception ex) { + // do not exit for exceptions other than InterruptedException + log.error("Error occurred while cleaning up expired entry", ex); + } + }; + cleanerScheduler.scheduleOnce("delete-index", runnable, fileDeleteDelayMs); Review Comment: Good catch on the earlier issue and fixing it using a schedule delay like we do with local segments. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ########## @@ -91,18 +90,14 @@ public class RemoteIndexCache implements Closeable { */ private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); - /** - * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. - */ - private final LinkedBlockingQueue<Entry> expiredIndexes = new LinkedBlockingQueue<>(); - /** * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other * concurrent reads in-progress. */ - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final RemoteStorageManager remoteStorageManager; - private final ShutdownableThread cleanerThread; + private final KafkaScheduler cleanerScheduler = new KafkaScheduler(1, true, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD); + private int fileDeleteDelayMs = 60_000; Review Comment: I think the value of 60 secs looks to be reasonable. We can revisit setting a config if really needed in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org