ableegoldman commented on code in PR #12771: URL: https://github.com/apache/kafka/pull/12771#discussion_r1006792420
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -810,10 +810,19 @@ private void transitRestoredTaskToRunning(final Task task, } private void addTasksToStateUpdater() { + final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>(); for (final Task task : tasks.drainPendingTaskToInit()) { - task.initializeIfNeeded(); - stateUpdater.add(task); + try { + task.initializeIfNeeded(); + stateUpdater.add(task); + } catch (final RuntimeException e) { + // need to add task back to the bookkeeping to be handled by the stream thread + tasks.addTask(task); Review Comment: FWIW w.r.t the idea of attaching the entire task instead of just its taskId, I agree with Lucas here -- seems risky, would likely end up being abused as things progress (by us not the users), but most importantly imo is that it's just bad practice. I'm not too well versed on anti patterns I'll admit (perks & downsides of being a physics major 😉), but I would think this counts as one (or should) -- we shouldn't use exceptions to pass around actual objects rather than simple info/metadata. Especially objects of uncertain status, questionable thread safety, and significant resources attached. Just my two cents as someone who's coming into the restoration thread game very late -- given this fact I'm not particularly qualified to weigh in on what the _right_ thing to do is here, though again I feel like Lucas's suggestion to maintain a/the `TaskRegistry` as a central, source-of-truth type thing for tracking and organizing all the tasks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org