cadonna commented on code in PR #12771: URL: https://github.com/apache/kafka/pull/12771#discussion_r1009425293
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -810,10 +810,19 @@ private void transitRestoredTaskToRunning(final Task task, } private void addTasksToStateUpdater() { + final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>(); for (final Task task : tasks.drainPendingTaskToInit()) { - task.initializeIfNeeded(); - stateUpdater.add(task); + try { + task.initializeIfNeeded(); + stateUpdater.add(task); + } catch (final RuntimeException e) { + // need to add task back to the bookkeeping to be handled by the stream thread + tasks.addTask(task); Review Comment: I think I agree with the risk of passing the entire task to an exception that does not have an explicit way how to cleanup itself like the `shutdown()`methods in `TaskManager` and `StateUpdater`. However, I am not a fan of a central place for managing all tasks. We explicitly decided to have the invariant that a task that is managed by the state updater should not exist in the task manager and vice versa. That is to avoid concurrent modifications of the tasks by both threads. We need to evaluate if in the places where all tasks are returned really all tasks need to be returned and if yes we also need to call `getTasks()` on the state updater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org