vvcephei commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r448706412
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, if (oldTask.isActive()) { final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already Review comment: Your words sound good to me, but they are directly in contradiction to the code here, which skips suspending and committing active tasks because it assumes they have already happened. In other words, there is an assumption here that the active tasks are in some kind of "committed" state, while the standbys are in either "created" or "running". If we're going to have a branch that explicitly assumes the task is already committed, then I'd like to verify it, otherwise experience says it will become false after refactoring, and we'd wind up trying to track down an IllegalStateException later on. On the other hand, if these transitions are idempotent, we can just include them in both branches (or rather move them outside the conditional block). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org