ableegoldman commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r448714590
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, if (oldTask.isActive()) { final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already Review comment: Also, the `commit` case is trickier than the `suspend` as it's possible we actually _don't_ commit a task before closing it, if the commit or any commit preparation fails. So we can't just assert that the task is committed. We can assert that we attempted to commit it by keeping track of tasks we tried to commit/revoke between `handleRevocation` and `handleAssignment`, but I'm quite confident that would quickly get out of control. We could introduce a `#commitAttempted` method on the Task but that also seems to invite bugs We could leave it up to the Task to make sure everything is done safely during the close procedure. What the task currently does is verify that no commit is needed if a clean close is attempted -- if we try to close clean but a commit _is_ still needed, it means the commit failed, and we can throw an exception to force the TM to closeDirty ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org