mjsax commented on code in PR #21566:
URL: https://github.com/apache/kafka/pull/21566#discussion_r2876384705


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -603,6 +603,55 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
         maybeCleanEmptyNamedTopologyDirs(true);
     }
 
+    /**
+     * Purges local state directories and checkpoint files during application 
startup.
+     *
+     * @param dirMaxAgeMs the time-based threshold in milliseconds. Only state 
directories
+     * and checkpoint files that have not been modified for at least
+     * this amount of time (corresponding to the
+     * {@code state.cleanup.dir.max.age.ms} property) will be removed.
+     */
+    public synchronized void cleanOutdatedDirsOnStartup(final long 
dirMaxAgeMs) {
+        try {
+            cleanStateAndTaskDirectoriesOnStartup(dirMaxAgeMs);
+        } catch (final Exception e) {
+            throw new StreamsException(e);
+        }
+    }
+
+    private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs) 
throws Exception {
+        if (!lockedTasksToOwner.isEmpty()) {
+            log.warn("Found some still-locked task directories when cleaning 
outdated directories");
+        }
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+        for (final TaskDirectory taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.file().getName();
+            final TaskId id = parseTaskDirectoryName(dirName, 
taskDir.namedTopology());
+            try {
+                final long now = time.milliseconds();
+                final long lastModifiedMs = taskDir.file().lastModified();
+                if (now - dirMaxAgeMs > lastModifiedMs) {
+                    log.info("{} Deleting outdated state directory {} for {} 
as {}ms has elapsed (max directory age is {}ms).",
+                            logPrefix(), dirName, id, now - lastModifiedMs, 
dirMaxAgeMs);
+
+                    if (lockedTasksToOwner.containsKey(id)) {
+                        log.warn("{} Task {} in state directory {} was still 
locked by {}",
+                                logPrefix(), dirName, id, 
lockedTasksToOwner.get(id));
+                    }
+                    Utils.delete(taskDir.file());
+                }
+            } catch (final IOException exception) {
+                log.error("{} Failed to delete task directory {} for {} with 
exception:", logPrefix(), dirName, id, exception);
+                firstException.compareAndSet(null, exception);

Review Comment:
   Find with me -- was just an idea to simplify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to