hachikuji commented on a change in pull request #9807:
URL: https://github.com/apache/kafka/pull/9807#discussion_r552892702
##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -1229,19 +1229,24 @@ object ReassignPartitionsCommand extends Logging {
}
}
+ /**
+ * Compute the in progress partition move from the current reassignments.
+ * @param currentReassignments All replicas, adding replicas and removing
replicas of target partitions
+ */
private def calculateCurrentMoveMap(currentReassignments:
Map[TopicPartition, PartitionReassignment]): MoveMap = {
val moveMap = new mutable.HashMap[String, mutable.Map[Int,
PartitionMove]]()
// Add the current reassignments to the move map.
currentReassignments.foreach { case (part, reassignment) =>
- val move = PartitionMove(new mutable.HashSet[Int](), new
mutable.HashSet[Int]())
- reassignment.replicas.forEach { replica =>
- move.sources += replica
- move.destinations += replica
- }
- reassignment.addingReplicas.forEach(move.destinations += _)
- reassignment.removingReplicas.forEach(move.destinations -= _)
val partMoves = moveMap.getOrElseUpdate(part.topic, new
mutable.HashMap[Int, PartitionMove])
- partMoves.put(part.partition, move)
+
+ // The addingReplicas is included in the replicas during reassignment
+ val sources = mutable.Set[Int]() ++
reassignment.replicas().asScala.map(Int.unbox)
+ .diff(reassignment.addingReplicas.asScala.map(Int.unbox))
Review comment:
nit: not a big deal for a tool, but we may as well extract a variable
for `reassignment.addingReplicas.asScala.map(Int.unbox)` since we use it twice
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]