wenbingshen commented on a change in pull request #10346: URL: https://github.com/apache/kafka/pull/10346#discussion_r598088530
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig, private def processPartitionModifications(topic: String): Unit = { def restorePartitionReplicaAssignment( topic: String, - newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment] + restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment] ): Unit = { info("Restoring the partition replica assignment for topic %s".format(topic)) - val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic)) - val existingPartitionReplicaAssignment = newPartitionReplicaAssignment - .filter(p => existingPartitions.contains(p._1.partition.toString)) - .map { case (tp, _) => - tp -> controllerContext.partitionFullReplicaAssignment(tp) - }.toMap - zkClient.setTopicAssignment(topic, controllerContext.topicIds.get(topic), - existingPartitionReplicaAssignment, + restorePartitionReplicaAssignment, controllerContext.epochZkVersion) } if (!isActive) return val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic)) - val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) => + val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) => controllerContext.partitionReplicaAssignment(topicPartition).isEmpty } + val existingPartitionsInContext = partitionReplicaAssignment.dropWhile(partitionAndReplica => newPartitionsToBeAdded.contains(partitionAndReplica._1)) + val oldPartitionsToBeModified = existingPartitionsInContext.filter{ case (topicPartition, _) => + controllerContext.partitionReplicaAssignment(topicPartition).diff(partitionReplicaAssignment(topicPartition).replicas).nonEmpty + } + if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) { - if (partitionsToBeAdded.nonEmpty) { + if (newPartitionsToBeAdded.nonEmpty) { warn("Skipping adding partitions %s for topic %s since it is currently being deleted" - .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic)) + .format(newPartitionsToBeAdded.map(_._1.partition).mkString(","), topic)) + + val existingPartitionsInZk = zkClient.getChildren(TopicPartitionsZNode.path(topic)) + val existingPartitionReplicaAssignment = partitionReplicaAssignment + .filter(p => existingPartitionsInZk.contains(p._1.partition.toString)) + .map { case (tp, _) => + tp -> controllerContext.partitionFullReplicaAssignment(tp) + }.toMap - restorePartitionReplicaAssignment(topic, partitionReplicaAssignment) + restorePartitionReplicaAssignment(topic, existingPartitionReplicaAssignment) } else { // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion info("Ignoring partition change during topic deletion as no new partitions are added") } - } else if (partitionsToBeAdded.nonEmpty) { - info(s"New partitions to be added $partitionsToBeAdded") - partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) => + } else if (oldPartitionsToBeModified.nonEmpty) { + warn("Skipping modifying existing partitions %s for topic %s, will restore their replica assignment by cache in controllerContext" + .format(oldPartitionsToBeModified.map(_._1.partition).mkString(","), topic)) + val restoreOldPartitionReplicaAssignment = partitionReplicaAssignment.map { case (topicPartition: TopicPartition, assignedReplicas: ReplicaAssignment) => + if (oldPartitionsToBeModified.contains(topicPartition)) { + (topicPartition, ReplicaAssignment(controllerContext.partitionReplicaAssignment(topicPartition), Seq.empty, Seq.empty)) + } else { + (topicPartition, assignedReplicas) + } + } + restorePartitionReplicaAssignment(topic, restoreOldPartitionReplicaAssignment) + } else if (newPartitionsToBeAdded.nonEmpty) { Review comment: > It's possible that both oldPartitionsToBeModified and newPartitionsToBeAdded are nonEmpty and we need to handle that case. when existing partitions assignments are modified, it will restore existing partitions assignments to zk, if new partitions are added, new partitions assignments will be restored in zk together with the existing partition to trigger the partition modification event again to complete the addition of the new partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org