ableegoldman commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r450429655
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, if (oldTask.isActive()) { final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already Review comment: I won't hold up this PR on the point, but I just don't see how checking the state of the task here and throwing an exception if it's not in suspended at this random point in the TM code is any more clear than checking the state in the task and throwing an exception if it's not in suspended in `Task#close` The latter makes it very clear what the problem is -- tasks need to be suspended before they are closed, end of story. The former seems to enforce suspension at an arbitrary point in the TM. >Why should this be true? Because some other code deep inside another set of nested conditionals thirty lines above looks like it does that right now This makes me wonder if I'm misinterpreting you, though, at least in part. Are you saying that it's fine to make this assumption thirty lines up when dealing with tasks that are actually being closed, but it's not ok to make this assumption here specifically when we are recycling tasks? Or are you saying we should add this check in both places where we make this assumption in `TM#handleAssignment`, and referring to where we do the suspension by "thirty lines up" (it's technically down, so maybe I'm being too literal here) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org