vvcephei commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r448706412



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       Your words sound good to me, but they are directly in contradiction to 
the code here, which skips suspending and committing active tasks because it 
assumes they have already happened. In other words, there is an assumption here 
that the active tasks are in some kind of "committed" state, while the standbys 
are in either "created" or "running".
   
   If we're going to have a branch that explicitly assumes the task is already 
committed, then I'd like to verify it, otherwise experience says it will become 
false after refactoring, and we'd wind up trying to track down an 
IllegalStateException later on.
   
   On the other hand, if these transitions are idempotent, we can just include 
them in both branches (or rather move them outside the conditional block).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to