junrao commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r621652737
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List<Integer> assignment, } } + /** + * Iterate over a sequence of partitions and generate ISR changes and/or leader + * changes if necessary. + * + * @param context A human-readable context string used in log4j logging. + * @param brokerToRemove NO_LEADER if no broker is being removed; the ID of the + * broker to remove from the ISR and leadership, otherwise. + * @param brokerToAdd NO_LEADER if no broker is being added; the ID of the + * broker which is now eligible to be a leader, otherwise. + * @param records A list of records which we will append to. + * @param iterator The iterator containing the partitions to examine. + */ + void generateLeaderAndIsrUpdates(String context, + int brokerToRemove, + int brokerToAdd, + List<ApiMessageAndVersion> records, + Iterator<TopicIdPartition> iterator) { + int oldSize = records.size(); + Function<Integer, Boolean> isAcceptableLeader = + r -> r == brokerToAdd || clusterControl.unfenced(r); + while (iterator.hasNext()) { + TopicIdPartition topicIdPart = iterator.next(); + TopicControlInfo topic = topics.get(topicIdPart.topicId()); + if (topic == null) { + throw new RuntimeException("Topic ID " + topicIdPart.topicId() + + " existed in isrMembers, but not in the topics map."); + } + PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); + if (partition == null) { + throw new RuntimeException("Partition " + topicIdPart + + " existed in isrMembers, but not in the partitions map."); + } + int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove); + int newLeader; + if (isGoodLeader(newIsr, partition.leader)) { + // If the current leader is good, don't change. + newLeader = partition.leader; + } else { + // Choose a new leader. + boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name); + newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, isAcceptableLeader); Review comment: The reason that we want to remove the shutting down replicas from ISR is to optimize for latency. If we don't do that, when the broker actually shuts down, it can block the producer for replica.max.ms before the replica can be taken out of ISR. So, I think this optimization is still useful. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -356,7 +367,9 @@ public void replay(PartitionChangeRecord record) { brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, newPartitionInfo.leader); - log.debug("Applied ISR change record: {}", record.toString()); + String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " + Review comment: In the old controller, we bump up the leader epoch if the ISR is changed during the controlled shutdown. This helps prevent the shutting down broker from being added to ISR again. In the raft controller, we bump up the partitionEpoch when the ISR is changed. Do we plan to fence a fetch request with unmatched partitionEpoch to achieve the same logic? If so, do we have a jira to track that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org