junrao commented on code in PR #12271: URL: https://github.com/apache/kafka/pull/12271#discussion_r901926241
########## core/src/main/scala/kafka/controller/KafkaController.scala: ########## @@ -776,33 +776,60 @@ class KafkaController(val config: KafkaConfig, * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If the reassignment * is cancelled, there is no way to restore the original order. * - * @param topicPartition The reassigning partition - * @param reassignment The new reassignment + * @param topicPartition The topic partition + * @param newAssignment The new assignment */ - private def updateCurrentReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { + private def maybeUpdateCurrentAssignment(topicPartition: TopicPartition, newAssignment: ReplicaAssignment): Unit = { val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + if (currentAssignment != newAssignment) { + if (currentAssignment.isBeingReassigned) { + // Cancel the current reassignment by removing unneeded replicas from the ISR + // and stopping/deleting them. Note that if the controller fails before updating + // the assignment state in Zookeeper below, these replicas may get restarted after + // controller fail-over. We expect the client would retry the cancellation in this case. + cancelReassignment(topicPartition, currentAssignment, newAssignment) + } - if (currentAssignment != reassignment) { - debug(s"Updating assignment of partition $topicPartition from $currentAssignment to $reassignment") - + info(s"Updating assignment of partition $topicPartition from $currentAssignment to $newAssignment") // U1. Update assignment state in zookeeper - updateReplicaAssignmentForPartition(topicPartition, reassignment) + updateReplicaAssignmentForPartition(topicPartition, newAssignment) // U2. Update assignment state in memory - controllerContext.updatePartitionFullReplicaAssignment(topicPartition, reassignment) - - // If there is a reassignment already in progress, then some of the currently adding replicas - // may be eligible for immediate removal, in which case we need to stop the replicas. - val unneededReplicas = currentAssignment.replicas.diff(reassignment.replicas) - if (unneededReplicas.nonEmpty) - stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas) + controllerContext.updatePartitionFullReplicaAssignment(topicPartition, newAssignment) } + } - if (!isAlterPartitionEnabled) { - val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition) - zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler) + private def cancelReassignment( Review Comment: Perhaps we could add a comment that the cancellation may fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org