ableegoldman commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r450429655



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       I won't hold up this PR on the point, but I just don't see how checking 
the state of the task here and throwing an exception if it's not in suspended 
at this random point in the TM code is any more clear than checking the state 
in the task and throwing an exception if it's not in suspended in `Task#close`
   
   The latter makes it very clear what the problem is -- tasks need to be 
suspended before they are closed, end of story. The former seems to enforce 
suspension at an arbitrary point in the TM.
   
   >Why should this be true? Because some other code deep inside another set of 
nested conditionals thirty lines above looks like it does that right now
   
   This makes me wonder if I'm misinterpreting you, though, at least in part. 
Are you saying that it's fine to make this assumption thirty lines up when 
dealing with tasks that are actually being closed, but it's not ok to make this 
assumption here specifically when we are recycling tasks? Or are you saying we 
should add this check in both places where we make this assumption in 
`TM#handleAssignment`, and referring to where we do the suspension by "thirty 
lines up" (it's technically down, so maybe I'm being too literal here)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to