[ 
https://issues.apache.org/jira/browse/KAFKA-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571070#comment-16571070
 ] 

ASF GitHub Bot commented on KAFKA-5928:
---------------------------------------

lindong28 closed pull request #3894: KAFKA-5928: Avoid redundant requests to 
zookeeper when reassign topic partition
URL: https://github.com/apache/kafka/pull/3894
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4d9da90bc69..041375a2113 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -347,13 +347,24 @@ object ReassignPartitionsCommand extends Logging {
     (partitionsToBeReassigned, replicaAssignment)
   }
 
-  private def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, 
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]])
+  def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, 
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]])
   :Map[TopicPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
-    partitionsToBeReassigned.keys.map { topicAndPartition =>
-      (topicAndPartition, checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition, partitionsToBeReassigned,
-        partitionsBeingReassigned))
+    val (beingReassigned, notBeingReassigned) = 
partitionsToBeReassigned.keys.partition { topicAndPartition =>
+      partitionsBeingReassigned.contains(topicAndPartition)
+    }
+    notBeingReassigned.groupBy(_.topic).flatMap { case (topic, partitions) =>
+      val replicasForTopic = 
zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
+      partitions.map { topicAndPartition =>
+        val newReplicas = partitionsToBeReassigned(topicAndPartition)
+        val reassignmentStatus = replicasForTopic.get(topicAndPartition) match 
{
+          case Some(seq) if seq == newReplicas => ReassignmentCompleted
+          case _ => ReassignmentFailed
+        }
+        (topicAndPartition, reassignmentStatus)
+      }
+    } ++ beingReassigned.map { topicAndPartition =>
+      (topicAndPartition, ReassignmentInProgress)
     }.toMap
   }
 
@@ -398,25 +409,6 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, 
topicAndPartition: TopicPartition,
-                                            partitionsToBeReassigned: 
Map[TopicPartition, Seq[Int]],
-                                            partitionsBeingReassigned: 
Map[TopicPartition, Seq[Int]]): ReassignmentStatus = {
-    val newReplicas = partitionsToBeReassigned(topicAndPartition)
-    partitionsBeingReassigned.get(topicAndPartition) match {
-      case Some(_) => ReassignmentInProgress
-      case None =>
-        // check if the current replica assignment matches the expected one 
after reassignment
-        val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topicAndPartition.topic, topicAndPartition.partition))
-        if(assignedReplicas == newReplicas)
-          ReassignmentCompleted
-        else {
-          println(("ERROR: Assigned replicas (%s) don't match the list of 
replicas for reassignment (%s)" +
-            " for partition %s").format(assignedReplicas.mkString(","), 
newReplicas.mkString(","), topicAndPartition))
-          ReassignmentFailed
-        }
-    }
-  }
-
   def validateAndParseArgs(args: Array[String]): 
ReassignPartitionsCommandOptions = {
     val opts = new ReassignPartitionsCommandOptions(args)
 
@@ -559,7 +551,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
   private[admin] def assignThrottledReplicas(existingPartitionAssignment: 
Map[TopicPartition, Seq[Int]],
                                              proposedPartitionAssignment: 
Map[TopicPartition, Seq[Int]],
                                              adminZkClient: AdminZkClient): 
Unit = {
-    for (topic <- proposedPartitionAssignment.keySet.map(_.topic).toSeq) {
+    for (topic <- 
proposedPartitionAssignment.keySet.map(_.topic).toSeq.distinct) {
       val existingPartitionAssignmentForTopic = 
existingPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
       val proposedPartitionAssignmentForTopic = 
proposedPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
 
@@ -621,7 +613,10 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
   def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 
10000L): Boolean = {
     maybeThrottle(throttle)
     try {
-      val validPartitions = proposedPartitionAssignment.filter { case (p, _) 
=> validatePartition(zkClient, p.topic, p.partition) }
+      val validPartitions = proposedPartitionAssignment.groupBy(_._1.topic())
+        .flatMap { case (topic, topicPartitionReplicas) =>
+          validatePartition(zkClient, topic, topicPartitionReplicas)
+        }
       if (validPartitions.isEmpty) false
       else {
         if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
@@ -655,21 +650,24 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
     }
   }
 
-  def validatePartition(zkClient: KafkaZkClient, topic: String, partition: 
Int): Boolean = {
+  def validatePartition(zkClient: KafkaZkClient, topic: String, 
topicPartitionReplicas: Map[TopicPartition, Seq[Int]])
+  :Map[TopicPartition, Seq[Int]] = {
     // check if partition exists
     val partitionsOpt = 
zkClient.getPartitionsForTopics(immutable.Set(topic)).get(topic)
-    partitionsOpt match {
-      case Some(partitions) =>
-        if(partitions.contains(partition)) {
-          true
-        } else {
-          error("Skipping reassignment of partition [%s,%d] ".format(topic, 
partition) +
-            "since it doesn't exist")
+    topicPartitionReplicas.filter { case (topicPartition, _) =>
+      partitionsOpt match {
+        case Some(partitions) =>
+          if (partitions.contains(topicPartition.partition())) {
+            true
+          } else {
+            error("Skipping reassignment of partition [%s,%d] ".format(topic, 
topicPartition.partition()) +
+              "since it doesn't exist")
+            false
+          }
+        case None => error("Skipping reassignment of partition " +
+          "[%s,%d] since topic %s doesn't exist".format(topic, 
topicPartition.partition(), topic))
           false
-        }
-      case None => error("Skipping reassignment of partition " +
-        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
-        false
+      }
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4d089b36b1f..12fb4791f2b 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -132,9 +132,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     assertTrue("Partition reassignment should fail for [test,0]", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = zkClient.getPartitionReassignment
-      
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicPartition,
-        Map(topicPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentFailed
+      
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicPartition -> newReplicas))
+        .getOrElse(topicPartition, fail(s"Failed to get reassignment status 
for $topicPartition")) == ReassignmentFailed
     }, "Partition reassignment shouldn't complete.")
     val controllerId = zkClient.getControllerId.getOrElse(fail("Controller 
doesn't exist"))
     val controller = servers.filter(s => s.config.brokerId == 
controllerId).head
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 6978f8dc7b7..213c23aff4d 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -439,9 +439,8 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging {
     assertTrue("Partition reassignment attempt failed for [test, 0]", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentCompleted
+        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicAndPartition -> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment 
status for $topicAndPartition")) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topic, partitionToBeReassigned))
@@ -469,9 +468,8 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging {
     assertTrue("Partition reassignment failed for test, 0", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentCompleted
+        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicAndPartition -> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment 
status for $topicAndPartition")) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topic, partitionToBeReassigned))
@@ -498,9 +496,8 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging {
     assertTrue("Partition reassignment failed for test, 0", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentCompleted
+        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicAndPartition -> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment 
status for $topicAndPartition")) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topic, partitionToBeReassigned))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid redundant requests to zookeeper when reassign topic partition
> -------------------------------------------------------------------
>
>                 Key: KAFKA-5928
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5928
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>    Affects Versions: 0.10.2.1, 0.11.0.0, 1.0.0, 2.0.0
>            Reporter: Genmao Yu
>            Priority: Major
>             Fix For: 2.1.0
>
>
> We mistakenly request topic level information according to partitions config 
> in the assignment json file. For example 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550:
>  {code}
> val validPartitions = proposedPartitionAssignment.filter { case (p, _) => 
> validatePartition(zkUtils, p.topic, p.partition) } 
> {code} 
> If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 
> times here. But actually we only need to request just 10 (topics) times. We 
> test a large-scale assignment, about 10K partitions. It takes tens of 
> minutes. After optimization, it will reduce to less than 1minute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to