hachikuji commented on a change in pull request #9807:
URL: https://github.com/apache/kafka/pull/9807#discussion_r552339407
##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -1250,38 +1252,36 @@ object ReassignPartitionsCommand extends Logging {
* Calculate the global map of all partitions that are moving.
*
* @param currentReassignments The currently active reassignments.
- * @param proposedReassignments The proposed reassignments (destinations
replicas only).
+ * @param proposedParts The proposed location of the partitions
(destinations replicas only).
* @param currentParts The current location of the partitions
that we are
* proposing to move.
* @return A map from topic name to partition map.
* The partition map is keyed on partition
index and contains
* the movements for that partition.
*/
def calculateProposedMoveMap(currentReassignments: Map[TopicPartition,
PartitionReassignment],
Review comment:
It might be just me, but the logic in this method is as clear as mud to
me. At a high level, we are just trying to identify the sources and the
destinations to fill a `PartitionMove`. The destinations are the adding
replicas (AR) and the sources are the current replicas (CR) without the adding
replicas. The current logic below first calls `calculateCurrentMoveMap` which
does the following:
1. Add each replica to both sources and destinations
2. Add all AR to destinations
3. Remove all removing replicas (RR) from destinations
I think this computes the destinations correctly (even if it does so in an
odd way), but sources will include all replicas, which is wrong. You have fixed
this by removing AR from sources explicitly, but I think we can simplify a bit
more. Maybe something like this
```scala
move.destinations = reassignment.addingReplicas
move.sources = reassignment.replicas - reassignment.addingReplicas
```
After computing the current move map, the current logic tries to account for
the proposed reassignments. It does the following:
1. Overwrite current move with an empty move and set destinations to the
target replicas (TR)
2. Iterate through all moves and check if sources is empty, which means we
have overwritten the move in step 1. If so, then we add all of CR to sources.
3. Now we do one more pass over the moves and remove all of the destinations
from sources.
I have a hard time making sense of this logic. The main problem is that it
assumes that TR are only destinations, but that is wrong. For example, if we
reassign [1, 2] to [1, 3], then 1 should be a source, not a destination even
though it is present in TR. I think the correct logic should be the following:
1. for each proposed assignment TR, check if there is already a reassignment
in progress:
2. if there is a reassignment in progress, then we need to account for the
current AR. We can compute sources as CR - AR, which is what we already did
when calculating the current move map
3. if there is no reassignment in progress, then we set sources to CR
4. regardless of whether there is a reassignment in progress, we set
destinations to TR - sources
Something like this?
```scala
proposedParts.foreach {
case (part, targetReplicas) =>
val partMoves = moveMap.getOrElseUpdate(part.topic, new
mutable.HashMap[Int, PartitionMove])
val sources = partMoves.get(part) match {
case Some(move) => move.sources
case None => currentParts.get(part)
}
val destinations = targetReplicas - sources
partMoves.put(part.partition, PartitionMove(sources, destinations))
}
```
Check that over and see if it makes sense to you. Basically I think the
current logic is unnecessarily complicated and probably wrong in multiple ways.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]