jsancio commented on code in PR #18240: URL: https://github.com/apache/kafka/pull/18240#discussion_r1896920647
########## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ########## @@ -299,13 +205,12 @@ public boolean canGrantVote( @Override public String toString() { return String.format( - "CandidateState(localId=%d, localDirectoryId=%s,epoch=%d, retries=%d, voteStates=%s, " + + "Candidate(localId=%d, localDirectoryId=%s, epoch=%d, voteStates=%s, " + Review Comment: The type should be `CandidateState`. Let's add the field `retries` back to the message. How about replacing `voterStates` with `epochState` and implementing a `toString` method for `EpochState`? ########## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ########## @@ -75,7 +79,7 @@ public FollowerState( @Override public ElectionState election() { - return ElectionState.withElectedLeader(epoch, leaderId, voters); + return new ElectionState(epoch, OptionalInt.of(leaderId), votedKey, voters); Review Comment: Let's revert this change. We should make it clear that this PR is not changing the persisted `quorum-state`. ########## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ########## @@ -114,137 +99,58 @@ public boolean isBackingOff() { return isBackingOff; } - public int retries() { - return retries; - } - /** - * Check whether we have received enough votes to conclude the election and become leader. - * - * @return true if at least a majority of nodes have granted the vote + * Record the current election has failed since we've either received sufficient rejecting voters or election timed out */ - public boolean isVoteGranted() { - return numGranted() >= majoritySize(); + public void startBackingOff(long currentTimeMs, long backoffDurationMs) { + this.backoffTimer.update(currentTimeMs); + this.backoffTimer.reset(backoffDurationMs); + this.isBackingOff = true; } - /** - * Check if we have received enough rejections that it is no longer possible to reach a - * majority of grants. - * - * @return true if the vote is rejected, false if the vote is already or can still be granted - */ - public boolean isVoteRejected() { - return numGranted() + numUnrecorded() < majoritySize(); + public boolean isBackoffComplete(long currentTimeMs) { + backoffTimer.update(currentTimeMs); + return backoffTimer.isExpired(); } - /** - * Record a granted vote from one of the voters. - * - * @param remoteNodeId The id of the voter - * @return true if the voter had not been previously recorded - * @throws IllegalArgumentException if the remote node is not a voter or if the vote had already been - * rejected by this node - */ + public long remainingBackoffMs(long currentTimeMs) { + if (!isBackingOff) { + throw new IllegalStateException("Candidate is not currently backing off"); + } + backoffTimer.update(currentTimeMs); + return backoffTimer.remainingMs(); + } Review Comment: Let's move this method so that it doesn't show in the diff. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -679,8 +706,26 @@ private void transitionToCandidate(long currentTimeMs) { onBecomeCandidate(currentTimeMs); } + private void onBecomeProspective(long currentTimeMs) { + ProspectiveState state = quorum.prospectiveStateOrThrow(); + if (!maybeTransitionToCandidate(state, currentTimeMs)) { + resetConnections(); + kafkaRaftMetrics.updateElectionStartMs(currentTimeMs); + } + } + + private void transitionToProspective(long currentTimeMs) { + quorum.transitionToProspective(); + maybeFireLeaderChange(); + onBecomeProspective(currentTimeMs); + } + private void transitionToUnattached(int epoch) { - quorum.transitionToUnattached(epoch); + transitionToUnattached(epoch, OptionalInt.empty()); + } Review Comment: Let's remove this and update the callers to use `transtitionToUnattached(int, OptionalInt)`. ########## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ########## @@ -114,137 +99,58 @@ public boolean isBackingOff() { return isBackingOff; } - public int retries() { - return retries; - } - /** - * Check whether we have received enough votes to conclude the election and become leader. - * - * @return true if at least a majority of nodes have granted the vote + * Record the current election has failed since we've either received sufficient rejecting voters or election timed out */ - public boolean isVoteGranted() { - return numGranted() >= majoritySize(); + public void startBackingOff(long currentTimeMs, long backoffDurationMs) { + this.backoffTimer.update(currentTimeMs); + this.backoffTimer.reset(backoffDurationMs); + this.isBackingOff = true; } - /** - * Check if we have received enough rejections that it is no longer possible to reach a - * majority of grants. - * - * @return true if the vote is rejected, false if the vote is already or can still be granted - */ - public boolean isVoteRejected() { - return numGranted() + numUnrecorded() < majoritySize(); + public boolean isBackoffComplete(long currentTimeMs) { + backoffTimer.update(currentTimeMs); + return backoffTimer.isExpired(); Review Comment: Let's move this method so that it doesn't show in the diff. ########## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ########## @@ -85,26 +80,16 @@ protected CandidateState( this.backoffTimer = time.timer(0); this.log = logContext.logger(CandidateState.class); - for (ReplicaKey voter : voters.voterKeys()) { - voteStates.put(voter.id(), new VoterState(voter)); - } - voteStates.get(localId).setState(State.GRANTED); + this.epochElection = new EpochElection(voters.voterKeys()); + epochElection.recordVote(localId, true); } public int localId() { return localId; } - public int majoritySize() { - return voteStates.size() / 2 + 1; - } - - private long numGranted() { - return votersInState(State.GRANTED).count(); - } - - private long numUnrecorded() { - return votersInState(State.UNRECORDED).count(); + public int retries() { + return retries; Review Comment: Let's move this method so that it doesn't show in the diff. ########## raft/src/main/java/org/apache/kafka/raft/CandidateState.java: ########## @@ -114,137 +99,58 @@ public boolean isBackingOff() { return isBackingOff; } - public int retries() { - return retries; - } - /** - * Check whether we have received enough votes to conclude the election and become leader. - * - * @return true if at least a majority of nodes have granted the vote + * Record the current election has failed since we've either received sufficient rejecting voters or election timed out */ - public boolean isVoteGranted() { - return numGranted() >= majoritySize(); + public void startBackingOff(long currentTimeMs, long backoffDurationMs) { + this.backoffTimer.update(currentTimeMs); + this.backoffTimer.reset(backoffDurationMs); + this.isBackingOff = true; Review Comment: Let's move this method so that it doesn't show in the diff. ########## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ########## @@ -118,6 +122,10 @@ public Node leaderNode(ListenerName listener) { ); } + public Optional<ReplicaKey> votedKey() { + return votedKey; + } + Review Comment: I got the impression that you don't use this in `src/main`. If so, let's remove it. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -368,14 +369,26 @@ public void transitionToResigned(List<ReplicaKey> preferredSuccessors) { } /** - * Transition to the "unattached" state. This means we have found an epoch greater than the current epoch, - * but we do not yet know of the elected leader. + * Transition to the "unattached" state. This means we have found an epoch greater than the current epoch + * and do not yet know of the elected leader, or we have transitioned from Prospective with the same epoch. + * Note, if we are transitioning from unattached and there is no epoch change, we take the path of + * unattachedAddVotedState instead. */ + // Used in testing public void transitionToUnattached(int epoch) { + transitionToUnattached(epoch, OptionalInt.empty()); + } Review Comment: Let's just remove this method and have the tests use `transitionToUnattached(int, OptionalInt)`. Also update the java doc to match the new signature and parameters. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -883,6 +928,13 @@ private boolean handleVoteResponse( VoteResponseData response = (VoteResponseData) responseMetadata.data(); Errors topLevelError = Errors.forCode(response.errorCode()); if (topLevelError != Errors.NONE) { + if (topLevelError == Errors.UNSUPPORTED_VERSION && quorum.isProspective()) { + logger.info("Prospective received unsupported version error in vote response in epoch {}, " + + "transitioning to Candidate state immediately since entire quorum may not support PreVote.", + quorum.epoch()); + transitionToCandidate(currentTimeMs); + return true; + } return handleTopLevelError(topLevelError, responseMetadata); Review Comment: How about: ```java if (topLevelError == Errors.UNSUPPORTED_VERSION && quorum.isProspective()) { logger.info( "Prospective received unsupported version error in vote response in epoch {}, " + "transitioning to Candidate state immediately since entire quorum may not support PreVote.", quorum.epoch() ); transitionToCandidate(currentTimeMs); return true; } else if (topLevelError != Errors.NONE) { return handleTopLevelError(topLevelError, responseMetadata); ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2442,10 +2503,10 @@ private void maybeTransition( " and epoch " + epoch + " which is inconsistent with current leader " + quorum.leaderId() + " and epoch " + quorum.epoch()); } else if (epoch > quorum.epoch()) { - if (leaderId.isPresent()) { + if (leaderId.isPresent() && !leaderEndpoints.isEmpty()) { Review Comment: I think this is too relax. The previous code assumed that if `leaderId.isPresent()` then `!leaderEndpoints.isEmpty()`. That should not change in this PR. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -957,13 +998,33 @@ private boolean handleVoteResponse( } } + private void maybeCandidateStartBackingOff(long currentTimeMs) { + // If in candidate state and vote is rejected, go immediately to a random, exponential backoff. The + // backoff starts low to prevent needing to wait the entire election timeout when the vote + // result has already been determined. The randomness prevents the next election from being + // gridlocked with another nominee due to timing. The exponential aspect limits epoch churn when + // the replica has failed multiple elections in succession. + if (quorum.isCandidate()) { + CandidateState candidate = quorum.candidateStateOrThrow(); + if (candidate.epochElection().isVoteRejected() && !candidate.isBackingOff()) { + logger.info("Insufficient remaining votes to become leader (rejected by {}). " + + "We will backoff before retrying election again", candidate.epochElection().rejectingVoters()); Review Comment: Let's fix this formatting. See my under examples on how we try to format code in the raft module. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -957,13 +998,33 @@ private boolean handleVoteResponse( } } + private void maybeCandidateStartBackingOff(long currentTimeMs) { + // If in candidate state and vote is rejected, go immediately to a random, exponential backoff. The + // backoff starts low to prevent needing to wait the entire election timeout when the vote + // result has already been determined. The randomness prevents the next election from being + // gridlocked with another nominee due to timing. The exponential aspect limits epoch churn when + // the replica has failed multiple elections in succession. + if (quorum.isCandidate()) { + CandidateState candidate = quorum.candidateStateOrThrow(); + if (candidate.epochElection().isVoteRejected() && !candidate.isBackingOff()) { + logger.info("Insufficient remaining votes to become leader (rejected by {}). " + + "We will backoff before retrying election again", candidate.epochElection().rejectingVoters()); + + candidate.startBackingOff( + currentTimeMs, + binaryExponentialElectionBackoffMs(candidate.retries()) + ); + } + } + } + private int binaryExponentialElectionBackoffMs(int retries) { if (retries <= 0) { throw new IllegalArgumentException("Retries " + retries + " should be larger than zero"); } // upper limit exponential co-efficients at 20 to avoid overflow return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)), - quorumConfig.electionBackoffMaxMs()); + quorumConfig.electionBackoffMaxMs()); Review Comment: Same here. Let's fix the indentation since you are already changing this part of the code. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -926,29 +978,18 @@ private boolean handleVoteResponse( if (quorum.isLeader()) { logger.debug("Ignoring vote response {} since we already became leader for epoch {}", partitionResponse, quorum.epoch()); - } else if (quorum.isCandidate()) { - CandidateState state = quorum.candidateStateOrThrow(); + } else if (quorum.isNomineeState()) { + NomineeState state = quorum.nomineeStateOrThrow(); if (partitionResponse.voteGranted()) { state.recordGrantedVote(remoteNodeId); - maybeTransitionToLeader(state, currentTimeMs); + maybeTransitionForward(state, currentTimeMs); } else { state.recordRejectedVote(remoteNodeId); - - // If our vote is rejected, we go immediately to the random backoff. This - // ensures that we are not stuck waiting for the election timeout when the - // vote has become gridlocked. - if (state.isVoteRejected() && !state.isBackingOff()) { - logger.info("Insufficient remaining votes to become leader (rejected by {}). " + - "We will backoff before retrying election again", state.rejectingVoters()); - - state.startBackingOff( - currentTimeMs, - binaryExponentialElectionBackoffMs(state.retries()) - ); - } + maybeCandidateStartBackingOff(currentTimeMs); } } else { - logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}", + logger.debug("Ignoring vote response {} since we are no longer a VotingState " + + "(Prospective or Candidate) in epoch {}", partitionResponse, quorum.epoch()); Review Comment: Let's fix the indentation. In the raft module we using this formatting style: ```java logger.debug( "Ignoring vote response {} since we are no longer a VotingState " + "(Prospective or Candidate) in epoch {}", partitionResponse, quorum.epoch() ); ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2442,10 +2503,10 @@ private void maybeTransition( " and epoch " + epoch + " which is inconsistent with current leader " + quorum.leaderId() + " and epoch " + quorum.epoch()); } else if (epoch > quorum.epoch()) { - if (leaderId.isPresent()) { + if (leaderId.isPresent() && !leaderEndpoints.isEmpty()) { transitionToFollower(epoch, leaderId.getAsInt(), leaderEndpoints, currentTimeMs); } else { - transitionToUnattached(epoch); + transitionToUnattached(epoch, leaderId); Review Comment: When would raft hit this case? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2760,7 +2822,7 @@ private VoteRequestData buildVoteRequest(ReplicaKey remoteVoter) { remoteVoter, endOffset.epoch(), endOffset.offset(), - false + isPreVote Review Comment: Minor but you can just inline the expression: `quorum.isProspective()`. Also, the call site that calls this knows the `NomineeState` so it can use that information to determine if the request is a preVote request. No need to query `QuorumState` for this information. For example, you can add `isPreVote` to the `NomineeState` interface and update the signature of this method to `buildVoteRequest(ReplicaKey, boolean)`. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -385,15 +398,17 @@ public void transitionToUnattached(int epoch) { electionTimeoutMs = candidateStateOrThrow().remainingElectionTimeMs(time.milliseconds()); } else if (isUnattached()) { electionTimeoutMs = unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds()); + } else if (isProspective() && !prospectiveStateOrThrow().epochElection().isVoteRejected()) { + electionTimeoutMs = prospectiveStateOrThrow().remainingElectionTimeMs(time.milliseconds()); } else { electionTimeoutMs = randomElectionTimeoutMs(); } durableTransitionTo(new UnattachedState( time, epoch, - OptionalInt.empty(), - Optional.empty(), + leaderId, + epoch == currentEpoch ? votedKey() : Optional.empty(), Review Comment: Let's write a comment explaining why this expression is needed. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3026,20 +3088,46 @@ private long pollCandidate(long currentTimeMs) { return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs); } else if (state.isBackingOff()) { if (state.isBackoffComplete(currentTimeMs)) { - logger.info("Re-elect as candidate after election backoff has completed"); - transitionToCandidate(currentTimeMs); + logger.info("Transition to prospective after election backoff has completed"); + transitionToProspective(currentTimeMs); return 0L; } return state.remainingBackoffMs(currentTimeMs); } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { - long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries()); - logger.info("Election has timed out, backing off for {}ms before becoming a candidate again", - backoffDurationMs); - state.startBackingOff(currentTimeMs, backoffDurationMs); - return backoffDurationMs; + logger.info("Election was not granted, transitioning to prospective"); + transitionToProspective(currentTimeMs); + return 0L; } else { + long minVoteRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); + return Math.min(minVoteRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); + } + } + + private long pollProspective(long currentTimeMs) { + ProspectiveState state = quorum.prospectiveStateOrThrow(); + GracefulShutdown shutdown = this.shutdown.get(); + + if (shutdown != null) { long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); - return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); + return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs); + } else if (state.epochElection().isVoteRejected() || state.hasElectionTimeoutExpired(currentTimeMs)) { Review Comment: This code shouldn't check `isVoteRejected` if it is handled in `handleVoteRespose`. I left a comment about this in that method. This is an event driven programming model The event that cause the majority of the voters to reject the prospective state is received in the vote response. And not when polling the prospective state. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -926,29 +978,18 @@ private boolean handleVoteResponse( if (quorum.isLeader()) { logger.debug("Ignoring vote response {} since we already became leader for epoch {}", partitionResponse, quorum.epoch()); - } else if (quorum.isCandidate()) { - CandidateState state = quorum.candidateStateOrThrow(); + } else if (quorum.isNomineeState()) { + NomineeState state = quorum.nomineeStateOrThrow(); if (partitionResponse.voteGranted()) { state.recordGrantedVote(remoteNodeId); - maybeTransitionToLeader(state, currentTimeMs); + maybeTransitionForward(state, currentTimeMs); } else { state.recordRejectedVote(remoteNodeId); - - // If our vote is rejected, we go immediately to the random backoff. This - // ensures that we are not stuck waiting for the election timeout when the - // vote has become gridlocked. - if (state.isVoteRejected() && !state.isBackingOff()) { - logger.info("Insufficient remaining votes to become leader (rejected by {}). " + - "We will backoff before retrying election again", state.rejectingVoters()); - - state.startBackingOff( - currentTimeMs, - binaryExponentialElectionBackoffMs(state.retries()) - ); - } + maybeCandidateStartBackingOff(currentTimeMs); Review Comment: This handle when the majority of the voters rejected the candidate. This also needs to also handle when the majority of the voters reject the prospective candidate. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) { } /** - * Grant a vote to a candidate. We will transition/remain in Unattached - * state until either the election timeout expires or a leader is elected. In particular, - * we do not begin fetching until the election has concluded and - * {@link #transitionToFollower(int, int, Endpoints)} is invoked. + * Grant a vote to a candidate as Unattached. We will transition to Unattached with votedKey + * state and remain there until either the election timeout expires or we discover the leader. */ - public void transitionToUnattachedVotedState( + public void unattachedAddVotedState( int epoch, ReplicaKey candidateKey ) { int currentEpoch = state.epoch(); if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d since it matches the local " + + "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + "broker.id", candidateKey, + state, epoch ) ); } else if (localId.isEmpty()) { - throw new IllegalStateException("Cannot transition to voted without a replica id"); - } else if (epoch < currentEpoch) { + throw new IllegalStateException("Cannot add voted state without a replica id"); + } else if (epoch != currentEpoch || !isUnattachedNotVoted()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d since the current epoch " + - "(%d) is larger", + "Cannot add voted key (%s) to current state (%s) in epoch %d", candidateKey, - epoch, - currentEpoch + state, + epoch ) ); - } else if (epoch == currentEpoch && !isUnattachedNotVoted()) { + } + + // Note that we reset the election timeout after voting for a candidate because we + // know that the candidate has at least as good of a chance of getting elected as us + durableTransitionTo( + new UnattachedState( + time, + epoch, + state.election().optionalLeaderId(), + Optional.of(candidateKey), + partitionState.lastVoterSet().voterIds(), + state.highWatermark(), + randomElectionTimeoutMs(), + logContext + ) + ); + log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch); + } + + /** + * Grant a vote to a candidate as Prospective. We will transition to Prospective with votedKey + * state and remain there until either the election timeout expires or we discover the leader. + */ + public void prospectiveAddVotedState( + int epoch, + ReplicaKey candidateKey + ) { + int currentEpoch = state.epoch(); + if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d from the current state (%s)", + "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + + "broker.id", Review Comment: Let's fix this formatting. ```java "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + "broker.id", ``` ########## raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.internals.EpochElection; + +import org.slf4j.Logger; + +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.raft.QuorumState.unattachedOrProspectiveCanGrantVote; + +public class ProspectiveState implements NomineeState { + private final int localId; + private final int epoch; + private final OptionalInt leaderId; + private final Optional<Endpoints> leaderEndpoints; + private final Optional<ReplicaKey> votedKey; + private final VoterSet voters; + private final EpochElection epochElection; + private final Optional<LogOffsetMetadata> highWatermark; + private final int retries; + private final long electionTimeoutMs; + private final Timer electionTimer; + private final Logger log; + + /** + * The lifetime of a prospective state is the following. + * + * 1. Once started, it will send prevote requests and keep record of the received vote responses + * 2. If it receives a message denoting a leader with a higher epoch, it will transition to follower state. + * 3. If majority votes granted, it will transition to candidate state. + * 4. If majority votes rejected or election times out, it will transition to unattached or follower state + * depending on if it knows the leader id and endpoints or not + */ + public ProspectiveState( + Time time, + int localId, + int epoch, + OptionalInt leaderId, + Optional<Endpoints> leaderEndpoints, + Optional<ReplicaKey> votedKey, + VoterSet voters, + Optional<LogOffsetMetadata> highWatermark, + int retries, + int electionTimeoutMs, + LogContext logContext + ) { + this.localId = localId; + this.epoch = epoch; + this.leaderId = leaderId; + this.leaderEndpoints = leaderEndpoints; + this.votedKey = votedKey; + this.voters = voters; + this.highWatermark = highWatermark; + this.retries = retries; + this.electionTimeoutMs = electionTimeoutMs; + this.electionTimer = time.timer(electionTimeoutMs); + this.log = logContext.logger(ProspectiveState.class); + + this.epochElection = new EpochElection(voters.voterKeys()); + epochElection.recordVote(localId, true); + } + + public int localId() { + return localId; + } + + public Optional<ReplicaKey> votedKey() { + return votedKey; + } + + @Override + public EpochElection epochElection() { + return epochElection; + } + + public int retries() { + return retries; + } Review Comment: I see. Retries is only preserved when the prospective transition from prospective to candidate. The retries are lost if it transitions to unattached. I think we should file a jira to remove this exponential backoff. I am convinced that it is starting to lose its value with this implementation and if we make the election timeout improvements you highlight in the KIP ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -529,26 +589,47 @@ public void transitionToFollower(int epoch, int leaderId, Endpoints endpoints) { ); } - public void transitionToCandidate() { + public void transitionToProspective() { if (isObserver()) { throw new IllegalStateException( String.format( - "Cannot transition to Candidate since the local id (%s) and directory id (%s) " + - "is not one of the voters %s", + "Cannot transition to Prospective since the local id (%s) and directory id (%s) " + + "is not one of the voters %s", localId, localDirectoryId, partitionState.lastVoterSet() ) ); - } else if (isLeader()) { - throw new IllegalStateException("Cannot transition to Candidate since the local broker.id=" + localId + - " since this node is already a Leader with state " + state); + } else if (isLeader() || isProspective()) { + throw new IllegalStateException("Cannot transition to Prospective since the local broker.id=" + localId + + " is state " + state); } int retries = isCandidate() ? candidateStateOrThrow().retries() + 1 : 1; + + durableTransitionTo(new ProspectiveState( + time, + localIdOrThrow(), + epoch(), + leaderId(), + Optional.of(state.leaderEndpoints()), + votedKey(), + partitionState.lastVoterSet(), + state.highWatermark(), + retries, + randomElectionTimeoutMs(), + logContext + )); Review Comment: How about this formatting: ```java durableTransitionTo( new ProspectiveState( time, ... ) ); ``` ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) { } /** - * Grant a vote to a candidate. We will transition/remain in Unattached - * state until either the election timeout expires or a leader is elected. In particular, - * we do not begin fetching until the election has concluded and - * {@link #transitionToFollower(int, int, Endpoints)} is invoked. + * Grant a vote to a candidate as Unattached. We will transition to Unattached with votedKey + * state and remain there until either the election timeout expires or we discover the leader. */ - public void transitionToUnattachedVotedState( + public void unattachedAddVotedState( int epoch, ReplicaKey candidateKey ) { int currentEpoch = state.epoch(); if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d since it matches the local " + + "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + "broker.id", candidateKey, + state, epoch ) ); } else if (localId.isEmpty()) { - throw new IllegalStateException("Cannot transition to voted without a replica id"); - } else if (epoch < currentEpoch) { + throw new IllegalStateException("Cannot add voted state without a replica id"); + } else if (epoch != currentEpoch || !isUnattachedNotVoted()) { Review Comment: Should this check that the epoch is not decreasing? ########## raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.internals.EpochElection; + +import org.slf4j.Logger; + +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.raft.QuorumState.unattachedOrProspectiveCanGrantVote; + +public class ProspectiveState implements NomineeState { + private final int localId; + private final int epoch; + private final OptionalInt leaderId; + private final Optional<Endpoints> leaderEndpoints; + private final Optional<ReplicaKey> votedKey; + private final VoterSet voters; + private final EpochElection epochElection; + private final Optional<LogOffsetMetadata> highWatermark; + private final int retries; + private final long electionTimeoutMs; + private final Timer electionTimer; + private final Logger log; + + /** + * The lifetime of a prospective state is the following. + * + * 1. Once started, it will send prevote requests and keep record of the received vote responses + * 2. If it receives a message denoting a leader with a higher epoch, it will transition to follower state. + * 3. If majority votes granted, it will transition to candidate state. + * 4. If majority votes rejected or election times out, it will transition to unattached or follower state + * depending on if it knows the leader id and endpoints or not + */ + public ProspectiveState( + Time time, + int localId, + int epoch, + OptionalInt leaderId, + Optional<Endpoints> leaderEndpoints, + Optional<ReplicaKey> votedKey, + VoterSet voters, + Optional<LogOffsetMetadata> highWatermark, + int retries, + int electionTimeoutMs, + LogContext logContext + ) { + this.localId = localId; + this.epoch = epoch; + this.leaderId = leaderId; + this.leaderEndpoints = leaderEndpoints; + this.votedKey = votedKey; + this.voters = voters; + this.highWatermark = highWatermark; + this.retries = retries; + this.electionTimeoutMs = electionTimeoutMs; + this.electionTimer = time.timer(electionTimeoutMs); + this.log = logContext.logger(ProspectiveState.class); + + this.epochElection = new EpochElection(voters.voterKeys()); + epochElection.recordVote(localId, true); + } + + public int localId() { + return localId; + } + + public Optional<ReplicaKey> votedKey() { + return votedKey; + } + + @Override + public EpochElection epochElection() { + return epochElection; + } + + public int retries() { + return retries; + } + + @Override + public boolean recordGrantedVote(int remoteNodeId) { + return epochElection().recordVote(remoteNodeId, true); + } + + @Override + public boolean recordRejectedVote(int remoteNodeId) { + if (remoteNodeId == localId) { + throw new IllegalArgumentException("Attempted to reject vote from ourselves"); + } + return epochElection().recordVote(remoteNodeId, false); + } + + @Override + public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) { + return unattachedOrProspectiveCanGrantVote( + leaderId, + votedKey, + epoch, + replicaKey, + isLogUpToDate, + isPreVote, + log + ); + } + + @Override + public boolean hasElectionTimeoutExpired(long currentTimeMs) { + electionTimer.update(currentTimeMs); + return electionTimer.isExpired(); + } + + @Override + public long remainingElectionTimeMs(long currentTimeMs) { + electionTimer.update(currentTimeMs); + return electionTimer.remainingMs(); + } + + @Override + public ElectionState election() { + return new ElectionState(epoch, leaderId, votedKey, voters.voterIds()); + } + + @Override + public int epoch() { + return epoch; + } + + @Override + public Endpoints leaderEndpoints() { + return leaderEndpoints.orElse(Endpoints.empty()); + } + + @Override + public Optional<LogOffsetMetadata> highWatermark() { + return highWatermark; + } + + @Override + public String toString() { + return String.format( + "Prospective(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, electionTimeoutMs=%s, highWatermark=%s)", Review Comment: Let's also include the epoch election and add a `toString` method to the `EpochElection` type. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -193,17 +200,6 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE randomElectionTimeoutMs(), logContext ); - } else if (election.hasVoted()) { - initialState = new UnattachedState( - time, - election.epoch(), - OptionalInt.empty(), - Optional.of(election.votedKey()), - partitionState.lastVoterSet().voterIds(), - Optional.empty(), - randomElectionTimeoutMs(), - logContext - ); Review Comment: Nice code removal! Thanks. ########## raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.raft.ReplicaKey; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Tracks the votes cast by voters in an election held by a Nominee. + */ +public class EpochElection { + private Map<Integer, VoterState> voterStates; + + public EpochElection(Set<ReplicaKey> voters) { + this.voterStates = voters.stream() + .collect(Collectors.toMap( + ReplicaKey::id, + VoterState::new + )); Review Comment: ```java .collect( Collectors.toMap( ReplicaKey::id, VoterState::new ) ); ``` ########## raft/src/main/java/org/apache/kafka/raft/UnattachedState.java: ########## @@ -120,60 +116,28 @@ public Optional<LogOffsetMetadata> highWatermark() { @Override public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) { - if (isPreVote) { - return canGrantPreVote(replicaKey, isLogUpToDate); - } else if (votedKey.isPresent()) { - ReplicaKey votedReplicaKey = votedKey.get(); - if (votedReplicaKey.id() == replicaKey.id()) { - return votedReplicaKey.directoryId().isEmpty() || votedReplicaKey.directoryId().equals(replicaKey.directoryId()); - } - log.debug( - "Rejecting Vote request (preVote=false) from candidate ({}), already have voted for another " + - "candidate ({}) in epoch {}", - replicaKey, - votedKey, - epoch - ); - return false; - } else if (leaderId.isPresent()) { - // If the leader id is known it should behave similar to the follower state - log.debug( - "Rejecting Vote request (preVote=false) from candidate ({}) since we already have a leader {} in epoch {}", - replicaKey, - leaderId, - epoch - ); - return false; - } else if (!isLogUpToDate) { - log.debug( - "Rejecting Vote request (preVote=false) from candidate ({}) since candidate epoch/offset is not up to date with us", - replicaKey - ); - } - - return isLogUpToDate; - } - - private boolean canGrantPreVote(ReplicaKey replicaKey, boolean isLogUpToDate) { - if (!isLogUpToDate) { - log.debug( - "Rejecting Vote request (preVote=true) from replica ({}) since replica's log is not up to date with us", - replicaKey - ); - } - - return isLogUpToDate; + return unattachedOrProspectiveCanGrantVote( + leaderId, + votedKey, + epoch, + replicaKey, + isLogUpToDate, + isPreVote, + log + ); } @Override public String toString() { - return "Unattached(" + - "epoch=" + epoch + - ", votedKey=" + votedKey.map(ReplicaKey::toString).orElse("null") + - ", voters=" + voters + - ", electionTimeoutMs=" + electionTimeoutMs + - ", highWatermark=" + highWatermark + - ')'; + return String.format("Unattached(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, " + + "electionTimeoutMs=%d, highWatermark=%s)", Review Comment: ```java return String.format( "Unattached(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, " + "electionTimeoutMs=%d, highWatermark=%s)", ``` ########## raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java: ########## @@ -71,10 +71,14 @@ public KafkaRaftMetrics(Metrics metrics, String metricGrpPrefix, QuorumState sta Gauge<String> stateProvider = (mConfig, currentTimeMs) -> { if (state.isLeader()) { return "leader"; + } else if (state.isProspectiveNotVoted()) { + return "prospective"; + } else if (state.isProspectiveAndVoted()) { + return "prospective-voted"; } else if (state.isCandidate()) { return "candidate"; } else if (state.isUnattachedAndVoted()) { - return "voted"; + return "unattached-voted"; Review Comment: This is a publicly visible change. Let's update the KIP if it doesn't include this change. ########## raft/src/main/java/org/apache/kafka/raft/UnattachedState.java: ########## @@ -71,13 +73,7 @@ public UnattachedState( @Override public ElectionState election() { - if (votedKey.isPresent()) { - return ElectionState.withVotedCandidate(epoch, votedKey().get(), voters); - } else if (leaderId.isPresent()) { - return ElectionState.withElectedLeader(epoch, leaderId.getAsInt(), voters); - } else { - return ElectionState.withUnknownLeader(epoch, voters); - } + return new ElectionState(epoch, leaderId, votedKey, voters); Review Comment: Let's undo this change. It is good to keep the existing invariant to avoid persisting both `leaderId` and `votedKey`. ########## raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.raft.ReplicaKey; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Tracks the votes cast by voters in an election held by a Nominee. + */ +public class EpochElection { + private Map<Integer, VoterState> voterStates; + + public EpochElection(Set<ReplicaKey> voters) { + this.voterStates = voters.stream() + .collect(Collectors.toMap( + ReplicaKey::id, + VoterState::new + )); + } + + /** + * Record a vote from a voter. + * @param voterId The id of the voter + * @param isGranted true if the vote is granted, false if it is rejected + * @return true if the voter had not been previously recorded + */ + public boolean recordVote(int voterId, boolean isGranted) { + boolean wasUnrecorded = false; + VoterState voterState = getVoterStateOrThrow(voterId); + if (voterState.state == VoterState.State.UNRECORDED) { + wasUnrecorded = true; + } + if (isGranted) { + voterState.setState(VoterState.State.GRANTED); + } else { + voterState.setState(VoterState.State.REJECTED); + } + return wasUnrecorded; + } + + /** + * Returns if a voter has granted the vote. + * @param voterId The id of the voter + * @throws IllegalArgumentException if the voter is not in the set of voters + */ + public boolean isGrantedVoter(int voterId) { + return getVoterStateOrThrow(voterId).state == VoterState.State.GRANTED; + } + + /** + * Returns if a voter has rejected the vote. + * @param voterId The id of the voter + * @throws IllegalArgumentException if the voter is not in the set of voters + */ + public boolean isRejectedVoter(int voterId) { + return getVoterStateOrThrow(voterId).state == VoterState.State.REJECTED; + } + + /** + * The set of voter ids. + */ + public Set<Integer> voterIds() { + return Collections.unmodifiableSet(voterStates.keySet()); + } + + /** + * Get the collection of voter states. + */ + public Collection<VoterState> voterStates() { + return Collections.unmodifiableCollection(voterStates.values()); + } + + /** + * Check whether we have received enough votes to conclude the election and become leader. + * + * @return true if at least a majority of nodes have granted the vote + */ + public boolean isVoteGranted() { + return numGranted() >= majoritySize(); + } + + /** + * Check if we have received enough rejections that it is no longer possible to reach a + * majority of grants. + * + * @return true if the vote is rejected, false if the vote is already or can still be granted + */ + public boolean isVoteRejected() { + return numGranted() + numUnrecorded() < majoritySize(); + } + + /** + * Get the set of voters which have not been counted as granted or rejected yet. + * + * @return The set of unrecorded voters + */ + public Set<ReplicaKey> unrecordedVoters() { + return votersOfState(VoterState.State.UNRECORDED).collect(Collectors.toSet()); + } + + /** + * Get the set of voters that have granted our vote requests. + * + * @return The set of granting voters, which should always contain the localId + */ + public Set<Integer> grantingVoters() { + return votersOfState(VoterState.State.GRANTED).map(ReplicaKey::id).collect(Collectors.toSet()); + } + + /** + * Get the set of voters that have rejected our candidacy. + * + * @return The set of rejecting voters + */ + public Set<Integer> rejectingVoters() { + return votersOfState(VoterState.State.REJECTED).map(ReplicaKey::id).collect(Collectors.toSet()); + } + + private VoterState getVoterStateOrThrow(int voterId) { + VoterState voterState = voterStates.get(voterId); + if (voterState == null) { + throw new IllegalArgumentException("Attempt to access voter state of non-voter " + voterId); + } + return voterState; + } + + private Stream<ReplicaKey> votersOfState(VoterState.State state) { + return voterStates + .values() + .stream() + .filter(voterState -> voterState.state().equals(state)) + .map(VoterState::replicaKey); + } + + private long numGranted() { + return votersOfState(VoterState.State.GRANTED).count(); + } + + private long numUnrecorded() { + return votersOfState(VoterState.State.UNRECORDED).count(); + } + + private int majoritySize() { + return voterStates.size() / 2 + 1; + } + + private static final class VoterState { Review Comment: Let's add a `toString` method to this type so that its value is included in log messages. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -957,13 +998,33 @@ private boolean handleVoteResponse( } } + private void maybeCandidateStartBackingOff(long currentTimeMs) { Review Comment: This code only have one caller. How about just manually inline it at the call site. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) { } /** - * Grant a vote to a candidate. We will transition/remain in Unattached - * state until either the election timeout expires or a leader is elected. In particular, - * we do not begin fetching until the election has concluded and - * {@link #transitionToFollower(int, int, Endpoints)} is invoked. + * Grant a vote to a candidate as Unattached. We will transition to Unattached with votedKey + * state and remain there until either the election timeout expires or we discover the leader. */ - public void transitionToUnattachedVotedState( + public void unattachedAddVotedState( int epoch, ReplicaKey candidateKey ) { int currentEpoch = state.epoch(); if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d since it matches the local " + + "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + "broker.id", candidateKey, + state, epoch ) ); } else if (localId.isEmpty()) { - throw new IllegalStateException("Cannot transition to voted without a replica id"); - } else if (epoch < currentEpoch) { + throw new IllegalStateException("Cannot add voted state without a replica id"); + } else if (epoch != currentEpoch || !isUnattachedNotVoted()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d since the current epoch " + - "(%d) is larger", + "Cannot add voted key (%s) to current state (%s) in epoch %d", candidateKey, - epoch, - currentEpoch + state, + epoch ) ); - } else if (epoch == currentEpoch && !isUnattachedNotVoted()) { + } + + // Note that we reset the election timeout after voting for a candidate because we + // know that the candidate has at least as good of a chance of getting elected as us + durableTransitionTo( + new UnattachedState( + time, + epoch, + state.election().optionalLeaderId(), + Optional.of(candidateKey), + partitionState.lastVoterSet().voterIds(), + state.highWatermark(), + randomElectionTimeoutMs(), + logContext + ) + ); + log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch); + } + + /** + * Grant a vote to a candidate as Prospective. We will transition to Prospective with votedKey + * state and remain there until either the election timeout expires or we discover the leader. + */ + public void prospectiveAddVotedState( + int epoch, + ReplicaKey candidateKey + ) { + int currentEpoch = state.epoch(); + if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d from the current state (%s)", + "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + + "broker.id", candidateKey, - epoch, - state + state, + epoch + ) + ); + } else if (localId.isEmpty()) { + throw new IllegalStateException("Cannot add voted state without a replica id"); + } else if (epoch != currentEpoch || !isProspectiveNotVoted()) { Review Comment: This should check that the epoch is not decreasing. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -529,26 +589,47 @@ public void transitionToFollower(int epoch, int leaderId, Endpoints endpoints) { ); } - public void transitionToCandidate() { + public void transitionToProspective() { if (isObserver()) { throw new IllegalStateException( String.format( - "Cannot transition to Candidate since the local id (%s) and directory id (%s) " + - "is not one of the voters %s", + "Cannot transition to Prospective since the local id (%s) and directory id (%s) " + + "is not one of the voters %s", Review Comment: Fix formatting. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -718,7 +849,64 @@ public boolean isResigned() { return state instanceof ResignedState; } + public boolean isProspective() { + return state instanceof ProspectiveState; + } + public boolean isCandidate() { return state instanceof CandidateState; } + + public boolean isNomineeState() { + return state instanceof NomineeState; + } + + public static boolean unattachedOrProspectiveCanGrantVote( Review Comment: Let's add a java doc comment to this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org