cadonna commented on code in PR #12771:
URL: https://github.com/apache/kafka/pull/12771#discussion_r1009425293


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -810,10 +810,19 @@ private void transitRestoredTaskToRunning(final Task task,
     }
 
     private void addTasksToStateUpdater() {
+        final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
         for (final Task task : tasks.drainPendingTaskToInit()) {
-            task.initializeIfNeeded();
-            stateUpdater.add(task);
+            try {
+                task.initializeIfNeeded();
+                stateUpdater.add(task);
+            } catch (final RuntimeException e) {
+                // need to add task back to the bookkeeping to be handled by 
the stream thread
+                tasks.addTask(task);

Review Comment:
   I think I agree with the risk of passing the entire task to an exception 
that does not have an explicit way how to cleanup itself like the 
`shutdown()`methods in `TaskManager` and `StateUpdater`. However, I am not a 
fan of a central place for managing all tasks. We explicitly decided to have 
the invariant that a task that is managed by the state updater should not exist 
in the task manager and vice versa. That is to avoid concurrent modifications 
of the tasks by both threads. 
   We need to evaluate if in the places where all tasks are returned really all 
tasks need to be returned and if yes we also need to call `getTasks()` on the 
state updater.     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to