ableegoldman commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r431482714
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -272,6 +274,30 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, } } + if (taskCloseExceptions.isEmpty()) { + for (final Task oldTask : tasksToRecycle) { + final Task newTask; + try { + if (oldTask.isActive()) { + final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); + newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + } else { + final Set<TopicPartition> partitions = activeTasksToCreate.remove(oldTask.id()); + newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer); + } + tasks.remove(oldTask.id()); + addNewTask(newTask); + } catch (final RuntimeException e) { + final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id()); + log.error(uncleanMessage, e); + taskCloseExceptions.put(oldTask.id(), e); + dirtyTasks.add(oldTask); + } + } + } else { + dirtyTasks.addAll(tasksToRecycle); + } Review comment: Yeah, it still seems to me like if we have to close any tasks as dirty we will ultimately have to do so for them all (as in `handleLostAll`) But that's a big assumption and even if true now, it may not always be...I'll remove this ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org