satishd commented on code in PR #19462:
URL: https://github.com/apache/kafka/pull/19462#discussion_r2048400153


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -208,46 +203,32 @@ public void removeAll(Collection<Uuid> keys) {
         }
     }
 
-    private void enqueueEntryForCleanup(Entry entry, Uuid key) {
+    private void enqueueEntryForCleanup(Entry entry) {
         try {
             entry.markForCleanup();
-            if (!expiredIndexes.offer(entry)) {
-                log.error("Error while inserting entry {} for key {} into the 
cleaner queue because queue is full.", entry, key);
-            }
+            Runnable runnable = () -> {
+                try {
+                    log.debug("Cleaning up index entry {}", entry);
+                    entry.cleanup();
+                } catch (Exception ex) {
+                    // do not exit for exceptions other than 
InterruptedException
+                    log.error("Error occurred while cleaning up expired 
entry", ex);
+                }
+            };
+            cleanerScheduler.scheduleOnce("delete-index", runnable, 
fileDeleteDelayMs);

Review Comment:
   Good catch on the earlier issue and fixing it using a schedule delay like we 
do with local segments. 



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -91,18 +90,14 @@ public class RemoteIndexCache implements Closeable {
      */
     private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
 
-    /**
-     * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
-     */
-    private final LinkedBlockingQueue<Entry> expiredIndexes = new 
LinkedBlockingQueue<>();
-
     /**
      * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
      * concurrent reads in-progress.
      */
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final RemoteStorageManager remoteStorageManager;
-    private final ShutdownableThread cleanerThread;
+    private final KafkaScheduler cleanerScheduler = new KafkaScheduler(1, 
true, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD);
+    private int fileDeleteDelayMs = 60_000;

Review Comment:
   I think the value of 60 secs looks to be reasonable. We can revisit setting 
a config if really needed in future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to