UladzislauBlok commented on code in PR #21566:
URL: https://github.com/apache/kafka/pull/21566#discussion_r2866695077
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -603,6 +603,55 @@ private void cleanRemovedTasksCalledByCleanerThread(final
long cleanupDelayMs) {
maybeCleanEmptyNamedTopologyDirs(true);
}
+ /**
+ * Purges local state directories and checkpoint files during application
startup.
+ *
+ * @param dirMaxAgeMs the time-based threshold in milliseconds. Only state
directories
+ * and checkpoint files that have not been modified for at least
+ * this amount of time (corresponding to the
+ * {@code state.cleanup.dir.max.age.ms} property) will be removed.
+ */
+ public synchronized void cleanOutdatedDirsOnStartup(final long
dirMaxAgeMs) {
+ try {
+ cleanStateAndTaskDirectoriesOnStartup(dirMaxAgeMs);
+ } catch (final Exception e) {
+ throw new StreamsException(e);
+ }
+ }
+
+ private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs)
throws Exception {
+ if (!lockedTasksToOwner.isEmpty()) {
+ log.warn("Found some still-locked task directories when cleaning
outdated directories");
+ }
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+ for (final TaskDirectory taskDir : listAllTaskDirectories()) {
+ final String dirName = taskDir.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDir.namedTopology());
+ try {
+ final long now = time.milliseconds();
+ final long lastModifiedMs = taskDir.file().lastModified();
+ if (now - dirMaxAgeMs > lastModifiedMs) {
+ log.info("{} Deleting outdated state directory {} for {}
as {}ms has elapsed (max directory age is {}ms).",
+ logPrefix(), dirName, id, now - lastModifiedMs,
dirMaxAgeMs);
+
+ if (lockedTasksToOwner.containsKey(id)) {
Review Comment:
Do we need to lock directory? This clean up is applied even before streams
threads are created.
I added this check because we have it for `clean`
UPD: I see that we can call `clean` (KafkaStreams#`cleanUp`) after shutdown.
I assume this task locking is for this case. I removed it
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -603,6 +603,55 @@ private void cleanRemovedTasksCalledByCleanerThread(final
long cleanupDelayMs) {
maybeCleanEmptyNamedTopologyDirs(true);
}
+ /**
+ * Purges local state directories and checkpoint files during application
startup.
+ *
+ * @param dirMaxAgeMs the time-based threshold in milliseconds. Only state
directories
+ * and checkpoint files that have not been modified for at least
+ * this amount of time (corresponding to the
+ * {@code state.cleanup.dir.max.age.ms} property) will be removed.
+ */
+ public synchronized void cleanOutdatedDirsOnStartup(final long
dirMaxAgeMs) {
+ try {
+ cleanStateAndTaskDirectoriesOnStartup(dirMaxAgeMs);
+ } catch (final Exception e) {
+ throw new StreamsException(e);
+ }
+ }
+
+ private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs)
throws Exception {
+ if (!lockedTasksToOwner.isEmpty()) {
+ log.warn("Found some still-locked task directories when cleaning
outdated directories");
+ }
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+ for (final TaskDirectory taskDir : listAllTaskDirectories()) {
+ final String dirName = taskDir.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDir.namedTopology());
+ try {
+ final long now = time.milliseconds();
+ final long lastModifiedMs = taskDir.file().lastModified();
+ if (now - dirMaxAgeMs > lastModifiedMs) {
+ log.info("{} Deleting outdated state directory {} for {}
as {}ms has elapsed (max directory age is {}ms).",
+ logPrefix(), dirName, id, now - lastModifiedMs,
dirMaxAgeMs);
+
+ if (lockedTasksToOwner.containsKey(id)) {
Review Comment:
I revisited a code one more time and directory should already be locked.
Clean up is first step of Kafka Streams `start`, while locking a directory
is part of kafka streams cration
```
private KafkaStreams(... params) {
...
...
processId = stateDirectory.initializeProcessId(); // here we lock
directory
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]