mjsax commented on code in PR #18732:
URL: https://github.com/apache/kafka/pull/18732#discussion_r1933071908


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -295,6 +294,9 @@ private void closeStartupTasks(final Predicate<Task> 
predicate) {
 
             // now that we have exclusive ownership of the drained tasks, 
close them
             for (final Task task : drainedTasks) {
+                // main thread locked the task initially on startup, but has 
moved on and will not unlock
+                // so we need to explicitly swap lock ownership here as this 
method is called by a StreamThread
+                lockedTasksToOwner.replace(task.id(), Thread.currentThread());

Review Comment:
   I understand that `StandbyTask.closeClean()` would block, as it cannot get 
the lock. I guess it only on the `StandbyTask.closeClean()` pass, because 
"startup tasks" are always standbys, right? So it can never happen that we call 
`StreamTask.closeClean()` for cleanup an unused "startup task".
   
   However, I am not sure if I understand this fix. Transferring ownership from 
the main thread to `StreamThread` implies that, `StreamThread` would call 
`closeStartupTasks(...)` -- I could not find the code that would result in this 
call. It seems `closeStartupTasks(...)` is only called from `KafkaStreams` 
instance, ie, the main thread, with the exception of the background 
state-dir-cleaner thread.
   
   Can you help me out with it?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -107,7 +106,7 @@ public StateDirectoryProcessFile() {
     private final boolean hasPersistentStores;
     private final boolean hasNamedTopologies;
 
-    private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();
+    private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new 
ConcurrentHashMap<>();

Review Comment:
   I think it was used across threads already before... `TaskManager` actually 
calls `StateDirectory#removeStartupTask(...)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to