wenbingshen commented on a change in pull request #10346:
URL: https://github.com/apache/kafka/pull/10346#discussion_r598088530



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic 
%s".format(topic))
 
-      val existingPartitions = 
zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = 
zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case 
(topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case 
(topicPartition, _) =>
       controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
     }
 
+    val existingPartitionsInContext = 
partitionReplicaAssignment.dropWhile(partitionAndReplica => 
newPartitionsToBeAdded.contains(partitionAndReplica._1))
+    val oldPartitionsToBeModified = existingPartitionsInContext.filter{ case 
(topicPartition, _) =>
+      
controllerContext.partitionReplicaAssignment(topicPartition).diff(partitionReplicaAssignment(topicPartition).replicas).nonEmpty
+    }
+
     if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
-      if (partitionsToBeAdded.nonEmpty) {
+      if (newPartitionsToBeAdded.nonEmpty) {
         warn("Skipping adding partitions %s for topic %s since it is currently 
being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), 
topic))
+          .format(newPartitionsToBeAdded.map(_._1.partition).mkString(","), 
topic))
+
+        val existingPartitionsInZk = 
zkClient.getChildren(TopicPartitionsZNode.path(topic))
+        val existingPartitionReplicaAssignment = partitionReplicaAssignment
+          .filter(p => 
existingPartitionsInZk.contains(p._1.partition.toString))
+          .map { case (tp, _) =>
+            tp -> controllerContext.partitionFullReplicaAssignment(tp)
+          }.toMap
 
-        restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
+        restorePartitionReplicaAssignment(topic, 
existingPartitionReplicaAssignment)
       } else {
         // This can happen if existing partition replica assignment are 
restored to prevent increasing partition count during topic deletion
         info("Ignoring partition change during topic deletion as no new 
partitions are added")
       }
-    } else if (partitionsToBeAdded.nonEmpty) {
-      info(s"New partitions to be added $partitionsToBeAdded")
-      partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
+    } else if (oldPartitionsToBeModified.nonEmpty) {
+      warn("Skipping modifying existing partitions %s for topic %s, will 
restore their replica assignment by cache in controllerContext"
+        .format(oldPartitionsToBeModified.map(_._1.partition).mkString(","), 
topic))
+      val restoreOldPartitionReplicaAssignment = 
partitionReplicaAssignment.map { case (topicPartition: TopicPartition, 
assignedReplicas: ReplicaAssignment) =>
+        if (oldPartitionsToBeModified.contains(topicPartition)) {
+          (topicPartition, 
ReplicaAssignment(controllerContext.partitionReplicaAssignment(topicPartition), 
Seq.empty, Seq.empty))
+        } else {
+          (topicPartition, assignedReplicas)
+        }
+      }
+      restorePartitionReplicaAssignment(topic, 
restoreOldPartitionReplicaAssignment)
+    } else if (newPartitionsToBeAdded.nonEmpty) {

Review comment:
       > It's possible that both oldPartitionsToBeModified and 
newPartitionsToBeAdded are nonEmpty and we need to handle that case.
   
   when existing partitions assignments are modified, it will restore existing 
partitions assignments to zk, if new partitions are added, new partitions 
assignments will be restored in zk together with the existing partition to 
trigger the partition modification event again to complete the addition of the 
new partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to