ijuma commented on a change in pull request #8891: URL: https://github.com/apache/kafka/pull/8891#discussion_r502724870
########## File path: core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala ########## @@ -162,6 +162,39 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { localLogOrException(part).highWatermark) } + @Test + def testAlterReassignmentThrottle(): Unit = { + cluster = new ReassignPartitionsTestCluster(zkConnect) + cluster.setup() + cluster.produceMessages("foo", 0, 50) + cluster.produceMessages("baz", 2, 60) + val assignment = """{"version":1,"partitions":""" + Review comment: Nit: if you're using triple quotes, you don't need to end them at every line. ########## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ########## @@ -1213,22 +1211,38 @@ object ReassignPartitionsCommand extends Logging { * @return A map from partition objects to error strings. */ def cancelPartitionReassignments(adminClient: Admin, - reassignments: Set[TopicPartition]) + reassignments: Set[TopicPartition]) : Map[TopicPartition, Throwable] = { val results: Map[TopicPartition, KafkaFuture[Void]] = adminClient.alterPartitionReassignments(reassignments.map { (_, (None: Option[NewPartitionReassignment]).asJava) }.toMap.asJava).values().asScala results.flatMap { - case (part, future) => { + case (part, future) => Review comment: Nit: this can go on the line above? ########## File path: core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala ########## @@ -407,17 +423,90 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { waitForBrokerLevelThrottles(unthrottledBrokerConfigs) // Wait for the directory movement to complete. - waitForVerifyAssignment(cluster.adminClient, assignment, true, + waitForVerifyAssignment(cluster.adminClient, reassignment.json, true, VerifyAssignmentResult(Map( - new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 2), true) + topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 2), true) ), false, Map( - new TopicPartitionReplica("foo", 0, 0) -> CompletedMoveState(newFoo1Dir) + new TopicPartitionReplica(topicPartition.topic, topicPartition.partition, 0) -> + CompletedMoveState(reassignment.targetDir) ), false)) val info1 = new BrokerDirs(cluster.adminClient.describeLogDirs(0.to(4). map(_.asInstanceOf[Integer]).asJavaCollection), 0) - assertEquals(newFoo1Dir, - info1.curLogDirs.getOrElse(new TopicPartition("foo", 0), "")) + assertEquals(reassignment.targetDir, + info1.curLogDirs.getOrElse(topicPartition, "")) + } + + @Test + def testAlterLogDirReassignmentThrottle(): Unit = { + val topicPartition = new TopicPartition("foo", 0) + + cluster = new ReassignPartitionsTestCluster(zkConnect) + cluster.setup() + cluster.produceMessages(topicPartition.topic, topicPartition.partition, 700) + + val targetBrokerId = 0 + val replicas = Seq(0, 1, 2) + val reassignment = buildLogDirReassignment(topicPartition, targetBrokerId, replicas) + + // Start the replica move with a low throttle so it does not complete + val initialLogDirThrottle = 1L + runExecuteAssignment(cluster.adminClient, false, reassignment.json, + interBrokerThrottle = -1L, initialLogDirThrottle) + waitForLogDirThrottle(Set(0), initialLogDirThrottle) + + // Now increase the throttle and verify that the log dir movement completes + val updatedLogDirThrottle = 3000000L + runExecuteAssignment(cluster.adminClient, additional = true, reassignment.json, + interBrokerThrottle = -1L, replicaAlterLogDirsThrottle = updatedLogDirThrottle) + waitForLogDirThrottle(Set(0), updatedLogDirThrottle) + + waitForVerifyAssignment(cluster.adminClient, reassignment.json, true, + VerifyAssignmentResult(Map( + topicPartition -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 2), true) + ), false, Map( + new TopicPartitionReplica(topicPartition.topic, topicPartition.partition, targetBrokerId) -> + CompletedMoveState(reassignment.targetDir) + ), false)) + } + + case class LogDirReassignment(json: String, currentDir: String, targetDir: String) + + private def buildLogDirReassignment(topicPartition: TopicPartition, + brokerId: Int, + replicas: Seq[Int]): LogDirReassignment = { + + val describeLogDirsResult = cluster.adminClient.describeLogDirs( + 0.to(4).map(_.asInstanceOf[Integer]).asJavaCollection) + + val logDirInfo = new BrokerDirs(describeLogDirsResult, brokerId) + assertTrue(logDirInfo.futureLogDirs.isEmpty) + + val currentDir = logDirInfo.curLogDirs(topicPartition) + val newDir = logDirInfo.logDirs.find(!_.equals(currentDir)).get + + val logDirs = replicas.map { replicaId => + if (replicaId == brokerId) + "\"%s\"".format(newDir) Review comment: Nit: can we just use string interpolation? ########## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ########## @@ -1213,22 +1211,38 @@ object ReassignPartitionsCommand extends Logging { * @return A map from partition objects to error strings. */ def cancelPartitionReassignments(adminClient: Admin, - reassignments: Set[TopicPartition]) + reassignments: Set[TopicPartition]) : Map[TopicPartition, Throwable] = { val results: Map[TopicPartition, KafkaFuture[Void]] = adminClient.alterPartitionReassignments(reassignments.map { (_, (None: Option[NewPartitionReassignment]).asJava) }.toMap.asJava).values().asScala results.flatMap { - case (part, future) => { + case (part, future) => try { future.get() None } catch { case t: ExecutionException => Some(part, t.getCause()) } + } + } + + private def calculateCurrentMoveMap(currentReassignments: Map[TopicPartition, PartitionReassignment]): MoveMap = { + val moveMap = new mutable.HashMap[String, mutable.Map[Int, PartitionMove]]() + // Add the current reassignments to the move map. + currentReassignments.foreach { case (part, reassignment) => + val move = PartitionMove(new mutable.HashSet[Int](), new mutable.HashSet[Int]()) + reassignment.replicas.forEach { + replica => move.sources += replica Review comment: Nit: move `replica` to the previous line? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org