vamossagar12 commented on code in PR #16628:
URL: https://github.com/apache/kafka/pull/16628#discussion_r1683244575


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1478,7 +1478,9 @@ public void restartTask(final ConnectorTaskId id, final 
Callback<Void> callback)
                 if (assignment.tasks().contains(id)) {
                     try (TickThreadStage stage = new 
TickThreadStage("restarting task " + id)) {
                         worker.stopAndAwaitTask(id);
-                        if (startTask(id))
+                        // It could happen that by the time the stop finishes, 
the task might not be assigned to this
+                        // worker
+                        if (assignment.tasks().contains(id) && startTask(id))

Review Comment:
   I noticed this in my testing that if for some reason the stop method is 
blocked and by the time the control returns, the assignments might no longer 
have that task which we wanted to restart in the first place. In such a case, 
the task would get started  on the worker even though it should no longer be 
owning it. I thought it's better to check for the assignment here again to be 
double sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to