junrao commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r618761439



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List<Integer> 
assignment,
         }
     }
 
+    /**
+     * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+     * changes if necessary.
+     *
+     * @param context           A human-readable context string used in log4j 
logging.
+     * @param brokerToRemove    NO_LEADER if no broker is being removed; the 
ID of the

Review comment:
       What about this place? It seems that Instead of relying upon NO_LEADER, 
it might be clearer to make brokerToRemove and brokerToAdd optional int?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List<Integer> 
assignment,
         }
     }
 
+    /**
+     * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+     * changes if necessary.
+     *
+     * @param context           A human-readable context string used in log4j 
logging.
+     * @param brokerToRemove    NO_LEADER if no broker is being removed; the 
ID of the
+     *                          broker to remove from the ISR and leadership, 
otherwise.
+     * @param brokerToAdd       NO_LEADER if no broker is being added; the ID 
of the
+     *                          broker which is now eligible to be a leader, 
otherwise.
+     * @param records           A list of records which we will append to.
+     * @param iterator          The iterator containing the partitions to 
examine.
+     */
+    void generateLeaderAndIsrUpdates(String context,
+                                     int brokerToRemove,
+                                     int brokerToAdd,
+                                     List<ApiMessageAndVersion> records,
+                                     Iterator<TopicIdPartition> iterator) {
+        int oldSize = records.size();
+        Function<Integer, Boolean> isAcceptableLeader =
+            r -> r == brokerToAdd || clusterControl.unfenced(r);
+        while (iterator.hasNext()) {
+            TopicIdPartition topicIdPart = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPart.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+
+                        " existed in isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicIdPart +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
+            int newLeader;
+            if (isGoodLeader(newIsr, partition.leader)) {
+                // If the current leader is good, don't change.
+                newLeader = partition.leader;
+            } else {
+                // Choose a new leader.
+                boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+                newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, 
isAcceptableLeader);

Review comment:
       Yes, it probably doesn't make a big difference and it happens rarely. 
So, we could just keep the logic in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to