UladzislauBlok commented on code in PR #21566:
URL: https://github.com/apache/kafka/pull/21566#discussion_r2877347183


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -603,6 +603,55 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
         maybeCleanEmptyNamedTopologyDirs(true);
     }
 
+    /**
+     * Purges local state directories and checkpoint files during application 
startup.
+     *
+     * @param dirMaxAgeMs the time-based threshold in milliseconds. Only state 
directories
+     * and checkpoint files that have not been modified for at least
+     * this amount of time (corresponding to the
+     * {@code state.cleanup.dir.max.age.ms} property) will be removed.
+     */
+    public synchronized void cleanOutdatedDirsOnStartup(final long 
dirMaxAgeMs) {
+        try {
+            cleanStateAndTaskDirectoriesOnStartup(dirMaxAgeMs);
+        } catch (final Exception e) {
+            throw new StreamsException(e);
+        }
+    }
+
+    private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs) 
throws Exception {
+        if (!lockedTasksToOwner.isEmpty()) {
+            log.warn("Found some still-locked task directories when cleaning 
outdated directories");

Review Comment:
   Yeah, I had the same assumption, so I removed the checked



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to