junrao commented on code in PR #12271:
URL: https://github.com/apache/kafka/pull/12271#discussion_r901926241


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -776,33 +776,60 @@ class KafkaController(val config: KafkaConfig,
    * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If 
the reassignment
    * is cancelled, there is no way to restore the original order.
    *
-   * @param topicPartition The reassigning partition
-   * @param reassignment The new reassignment
+   * @param topicPartition The topic partition
+   * @param newAssignment The new assignment
    */
-  private def updateCurrentReassignment(topicPartition: TopicPartition, 
reassignment: ReplicaAssignment): Unit = {
+  private def maybeUpdateCurrentAssignment(topicPartition: TopicPartition, 
newAssignment: ReplicaAssignment): Unit = {
     val currentAssignment = 
controllerContext.partitionFullReplicaAssignment(topicPartition)
+    if (currentAssignment != newAssignment) {
+      if (currentAssignment.isBeingReassigned) {
+        // Cancel the current reassignment by removing unneeded replicas from 
the ISR
+        // and stopping/deleting them. Note that if the controller fails 
before updating
+        // the assignment state in Zookeeper below, these replicas may get 
restarted after
+        // controller fail-over. We expect the client would retry the 
cancellation in this case.
+        cancelReassignment(topicPartition, currentAssignment, newAssignment)
+      }
 
-    if (currentAssignment != reassignment) {
-      debug(s"Updating assignment of partition $topicPartition from 
$currentAssignment to $reassignment")
-
+      info(s"Updating assignment of partition $topicPartition from 
$currentAssignment to $newAssignment")
       // U1. Update assignment state in zookeeper
-      updateReplicaAssignmentForPartition(topicPartition, reassignment)
+      updateReplicaAssignmentForPartition(topicPartition, newAssignment)
       // U2. Update assignment state in memory
-      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, 
reassignment)
-
-      // If there is a reassignment already in progress, then some of the 
currently adding replicas
-      // may be eligible for immediate removal, in which case we need to stop 
the replicas.
-      val unneededReplicas = 
currentAssignment.replicas.diff(reassignment.replicas)
-      if (unneededReplicas.nonEmpty)
-        stopRemovedReplicasOfReassignedPartition(topicPartition, 
unneededReplicas)
+      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, 
newAssignment)
     }
+  }
 
-    if (!isAlterPartitionEnabled) {
-      val reassignIsrChangeHandler = new 
PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
-      zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
+  private def cancelReassignment(

Review Comment:
   Perhaps we could add a comment that the cancellation may fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to