junrao commented on a change in pull request #11893: URL: https://github.com/apache/kafka/pull/11893#discussion_r829550657
########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; + + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } + } - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + if (election == Election.PREFERRED) { + return electPreferredLeader(); + } - return false; + return electAnyLeader(); } - class BestLeader { - final int node; - final boolean unclean; + /** + * Assumes that the election type is Election.PREFERRED + */ + private ElectionResult electPreferredLeader() { + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + Optional<Integer> onlineLeader = targetReplicas.stream() Review comment: Preferred leader election is an optimization. If we can't move the leader to the preferred one, it seems there is no need to do anything extra. ########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -104,53 +117,94 @@ public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) { return this; } - boolean shouldTryElection() { - // If the new isr doesn't have the current leader, we need to try to elect a new - // one. Note: this also handles the case where the current leader is NO_LEADER, - // since that value cannot appear in targetIsr. - if (!targetIsr.contains(partition.leader)) return true; + // VisibleForTesting + static class ElectionResult { + final int node; + final boolean unclean; + + private ElectionResult(int node, boolean unclean) { + this.node = node; + this.unclean = unclean; + } + } - // Check if we want to try to get away from a non-preferred leader. - if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + // VisibleForTesting + /** + * Perform leader election based on the partition state and leader election type. + * + * See documentation for the Election type to see more details on the election types supported. + */ + ElectionResult electLeader() { + if (election == Election.PREFERRED) { + return electPreferredLeader(); + } - return false; + return electAnyLeader(); } - class BestLeader { - final int node; - final boolean unclean; + /** + * Assumes that the election type is Election.PREFERRED + */ + private ElectionResult electPreferredLeader() { + int preferredReplica = targetReplicas.get(0); + if (targetIsr.contains(preferredReplica) && isAcceptableLeader.apply(preferredReplica)) { + return new ElectionResult(preferredReplica, false); + } - BestLeader() { - for (int replica : targetReplicas) { - if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = false; - return; - } - } - if (uncleanElectionOk.get()) { - for (int replica : targetReplicas) { - if (isAcceptableLeader.apply(replica)) { - this.node = replica; - this.unclean = true; - return; - } - } + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + Optional<Integer> onlineLeader = targetReplicas.stream() + .skip(1) + .filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) + .findFirst(); + if (onlineLeader.isPresent()) { + return new ElectionResult(onlineLeader.get(), false); + } + + return new ElectionResult(NO_LEADER, false); + } + + /** + * Assumes that the election type is either Election.ONLINE or Election.UNCLEAN + */ + private ElectionResult electAnyLeader() { + if (targetIsr.contains(partition.leader) && isAcceptableLeader.apply(partition.leader)) { + // Don't consider a new leader since the current leader meets all the constraints + return new ElectionResult(partition.leader, false); + } + + Optional<Integer> onlineLeader = targetReplicas.stream() + .filter(replica -> targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) + .findFirst(); + if (onlineLeader.isPresent()) { + return new ElectionResult(onlineLeader.get(), false); + } + + if (election == Election.UNCLEAN) { + // 5. Attempt unclean leader election Review comment: The number has changed and 5 is no longer relevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org