ableegoldman commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r448714590



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       Also, the `commit` case is trickier than the `suspend` as it's possible 
we actually _don't_ commit a task before closing it, if the commit or any 
commit preparation fails. So we can't just assert that the task is committed.
   
   We can assert that we attempted to commit it by keeping track of tasks we 
tried to commit/revoke between `handleRevocation` and `handleAssignment`, but 
I'm quite confident that would quickly get out of control.
   We could introduce a `#commitAttempted` method on the Task but that also 
seems to invite bugs
   We could leave it up to the Task to make sure everything is done safely 
during the close procedure. What the task currently does is verify that no 
commit is needed if a clean close is attempted -- if we try to close clean but 
a commit _is_ still needed, it means the commit failed, and we can throw an 
exception to force the TM to closeDirty




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to