nicktelford commented on code in PR #18732: URL: https://github.com/apache/kafka/pull/18732#discussion_r1933576571
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -295,6 +294,9 @@ private void closeStartupTasks(final Predicate<Task> predicate) { // now that we have exclusive ownership of the drained tasks, close them for (final Task task : drainedTasks) { + // main thread locked the task initially on startup, but has moved on and will not unlock + // so we need to explicitly swap lock ownership here as this method is called by a StreamThread + lockedTasksToOwner.replace(task.id(), Thread.currentThread()); Review Comment: @mjsax I managed to figure out how `StreamThread` is calling `StateDirectory#closeStartupTasks()`: it's called from the `StreamThread.StateListener` that's registered on all the threads. While the code is defined in `KafkaStreams`, this listener is registered with every `StreamThread` and `GlobalStreamThread`, and `onChange` is called by those threads. In this case, the reason it's triggering (but was not caught in the unit/integration tests) must be that there are some local startup tasks left over after the assignment was completed. When the assignment is completed, we close any remaining startup tasks because they're no longer needed. I also found a couple of other ways that `closeStartupTasks()` could be called from a different thread that would also trigger the same problem: 1. During shutdown, `StateDirectory.close()` is called by a ["shutdown helper thread" ](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1470) that `KafkaStreams` creates to handle the shutdown process. This would cause the same hang if the application is shutdown before the application has transitioned to the `RUNNING` state. 2. The state cleaner thread attempts to clean out an empty NamedTopology directory, which also calls `closeStartupTasks()`. 3. The user calls `KafkaStreams.cleanUp()` from a different thread than the one that called `KafkaStreams.start()`. Bills' fix will fix all of these cases. --- @bbejeck solution looks good to me, although I have one minor improvement that you may want to make: instead of adding this line, I think we should change line 289 from: ```java if (predicate.test(entry.getValue()) && tasksForLocalState.remove(entry.getKey()) != null) { ``` to: ```java if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) { ``` This ensures that the lock ownership update logic is encapsulated by a single function which we should always use for removing tasks from the `tasksForLocalState` Map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org