vvcephei commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r450513288



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
                     newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
       Thanks, all!
   
   Thanks for adding the check, @ableegoldman . To answer your question, I do 
think there may be some kind of misunderstanding. Almost certainly, I failed to 
make myself clear.
   
   I agree that the reason we enforce the valid transitions inside the task is 
so that the TM doesn't have to check every little thing. For example, that we 
call suspend in one place and close in another is not a big deal. As you 
pointed out, if we failed to call suspend, then the close call would let us 
know.
   
   However, in this case, we are checking something. But what we're checking is 
only vaguely related to what we do next. In other words, `if (! isSuspended() ) 
suspend()` makes way more sense than `if ( isStandby() ) suspend()`. How are we 
supposed to know that `standby implies not suspended` at line 275 of a 700-line 
class? At least, with the log you added, if any quadrant of the implication is 
false, we'll have an error log telling us where it went wrong.
   
   Note, I think I was especially uneasy because we're now also committing the 
task in this block, and "committed" or "not committed" isn't a checked state, 
which is how we wound up with the subtle bug that you're fixing here. I think 
your fix is fine; I just felt the pre-existing structure of the code needed 
improvement.
   
   And thanks for the alternative proposals, @guozhangwang . I agree that 
either one of them would resolve my concern, and that we can take care of it in 
a follow-on PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to