nicktelford commented on code in PR #18732:
URL: https://github.com/apache/kafka/pull/18732#discussion_r1933576571


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -295,6 +294,9 @@ private void closeStartupTasks(final Predicate<Task> 
predicate) {
 
             // now that we have exclusive ownership of the drained tasks, 
close them
             for (final Task task : drainedTasks) {
+                // main thread locked the task initially on startup, but has 
moved on and will not unlock
+                // so we need to explicitly swap lock ownership here as this 
method is called by a StreamThread
+                lockedTasksToOwner.replace(task.id(), Thread.currentThread());

Review Comment:
   @mjsax I managed to figure out how `StreamThread` is calling 
`StateDirectory#closeStartupTasks()`: it's called from the 
`StreamThread.StateListener` that's registered on all the threads. While the 
code is defined in `KafkaStreams`, this listener is registered with every 
`StreamThread` and `GlobalStreamThread`, and `onChange` is called by those 
threads.
   
   In this case, the reason it's triggering (but was not caught in the 
unit/integration tests) must be that there are some local startup tasks left 
over after the assignment was completed. When the assignment is completed, we 
close any remaining startup tasks because they're no longer needed.
   
   I also found a couple of other ways that `closeStartupTasks()` could be 
called from a different thread that would also trigger the same problem:
   
   1. During shutdown, `StateDirectory.close()` is called by a ["shutdown 
helper thread" 
](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1470)
 that `KafkaStreams` creates to handle the shutdown process. This would cause 
the same hang if the application is shutdown before the application has 
transitioned to the `RUNNING` state.
   2. The state cleaner thread attempts to clean out an empty NamedTopology 
directory, which also calls `closeStartupTasks()`.
   3. The user calls `KafkaStreams.cleanUp()` from a different thread than the 
one that called `KafkaStreams.start()`.
   
   Bills' fix will fix all of these cases.
   
   ---
   
   @bbejeck solution looks good to me, although I have one minor improvement 
that you may want to make: instead of adding this line, I think we should 
change line 289 from:
   
   ```java
   if (predicate.test(entry.getValue()) && 
tasksForLocalState.remove(entry.getKey()) != null) {
   ```
   
   to:
   
   ```java
   if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != 
null) {
   ```
   
   This ensures that the lock ownership update logic is encapsulated by a 
single function which we should always use for removing tasks from the 
`tasksForLocalState` Map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to