mjsax commented on code in PR #18732: URL: https://github.com/apache/kafka/pull/18732#discussion_r1933071908
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -295,6 +294,9 @@ private void closeStartupTasks(final Predicate<Task> predicate) { // now that we have exclusive ownership of the drained tasks, close them for (final Task task : drainedTasks) { + // main thread locked the task initially on startup, but has moved on and will not unlock + // so we need to explicitly swap lock ownership here as this method is called by a StreamThread + lockedTasksToOwner.replace(task.id(), Thread.currentThread()); Review Comment: I understand that `StandbyTask.closeClean()` would block, as it cannot get the lock. I guess it only on the `StandbyTask.closeClean()` pass, because "startup tasks" are always standbys, right? So it can never happen that we call `StreamTask.closeClean()` for cleanup an unused "startup task". However, I am not sure if I understand this fix. Transferring ownership from the main thread to `StreamThread` implies that, `StreamThread` would call `closeStartupTasks(...)` -- I could not find the code that would result in this call. It seems `closeStartupTasks(...)` is only called from `KafkaStreams` instance, ie, the main thread, with the exception of the background state-dir-cleaner thread. Can you help me out with it? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -107,7 +106,7 @@ public StateDirectoryProcessFile() { private final boolean hasPersistentStores; private final boolean hasNamedTopologies; - private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>(); + private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new ConcurrentHashMap<>(); Review Comment: I think it was used across threads already before... `TaskManager` actually calls `StateDirectory#removeStartupTask(...)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org