ableegoldman commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r448712536



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       While `suspend` is technically idempotent, the safer thing is actually 
to _not_ also call it for active tasks here. But let me back up a bit and try 
to further clarify my earlier response. I realize I made this claim without any 
hint as to what specifically I was referring to:
   
   By `we do assert/enforce this, but in the StreamTask` I meant that if we try 
to close a task without first suspending it, we will get an 
IllegalStateException in the `Task#close` method. The TaskManager is 
responsible for "managing" the operations on the task, but the task is 
ultimately responsible for verifying that its lifecycle makes sense and its 
state/transitions are valid. If we just blindly suspend everything here, it 
seems worse than getting an IllegalStateException because we lose the check 
that the task was prepared for close (ie suspended + committed) earlier during 
`handleRevocation`.
   
   > I'd like to verify it, otherwise experience says it will become false 
after refactoring, and we'd wind up trying to track down an 
IllegalStateException later on
   
   On that note, I'm slightly confused by your proposal:  what do you mean by 
"verify" if not to throw an IllegalStateException if it's not what's expected? 
Or do you just mean we'd have to track down an IllegalStateException slightly 
farther from the root cause




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to