junrao commented on code in PR #13451:
URL: https://github.com/apache/kafka/pull/13451#discussion_r1156683028


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -802,8 +802,11 @@ class KafkaController(val config: KafkaConfig,
       // If there is a reassignment already in progress, then some of the 
currently adding replicas
       // may be eligible for immediate removal, in which case we need to stop 
the replicas.
       val unneededReplicas = 
currentAssignment.replicas.diff(reassignment.replicas)
-      if (unneededReplicas.nonEmpty)
+      if (unneededReplicas.nonEmpty) {
+        // Elect a new leader if the current leader is the inside the unneeded 
replicas.
+        moveReassignedPartitionLeaderIfRequired(topicPartition, reassignment)

Review Comment:
   Thanks for identifying this issue. Great find.
   
   Fixing this completely may be a bit tricky. The first issue is that 
`moveReassignedPartitionLeaderIfRequired` only selects the new leader from and 
sends the new leaderAndIsr request to the target replicas. We probably need 
both to be based on target + original replicas since the reassignment is not 
done at this moment.
   
   A second potential issue is that we may not be able to move the leader since 
other replicas are not in ISR. In this case, I am wondering if it's better to 
reject the new reassignment instead. This is a much rarer case and could be 
addressed in a separate jira.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to