jsancio commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1896920647


##########
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##########
@@ -299,13 +205,12 @@ public boolean canGrantVote(
     @Override
     public String toString() {
         return String.format(
-            "CandidateState(localId=%d, localDirectoryId=%s,epoch=%d, 
retries=%d, voteStates=%s, " +
+            "Candidate(localId=%d, localDirectoryId=%s, epoch=%d, 
voteStates=%s, " +

Review Comment:
   The type should be `CandidateState`. Let's add the field `retries` back to 
the message.
   
   How about replacing `voterStates` with `epochState` and implementing a 
`toString` method for `EpochState`?



##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -75,7 +79,7 @@ public FollowerState(
 
     @Override
     public ElectionState election() {
-        return ElectionState.withElectedLeader(epoch, leaderId, voters);
+        return new ElectionState(epoch, OptionalInt.of(leaderId), votedKey, 
voters);

Review Comment:
   Let's revert this change. We should make it clear that this PR is not 
changing the persisted `quorum-state`.



##########
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##########
@@ -114,137 +99,58 @@ public boolean isBackingOff() {
         return isBackingOff;
     }
 
-    public int retries() {
-        return retries;
-    }
-
     /**
-     * Check whether we have received enough votes to conclude the election 
and become leader.
-     *
-     * @return true if at least a majority of nodes have granted the vote
+     * Record the current election has failed since we've either received 
sufficient rejecting voters or election timed out
      */
-    public boolean isVoteGranted() {
-        return numGranted() >= majoritySize();
+    public void startBackingOff(long currentTimeMs, long backoffDurationMs) {
+        this.backoffTimer.update(currentTimeMs);
+        this.backoffTimer.reset(backoffDurationMs);
+        this.isBackingOff = true;
     }
 
-    /**
-     * Check if we have received enough rejections that it is no longer 
possible to reach a
-     * majority of grants.
-     *
-     * @return true if the vote is rejected, false if the vote is already or 
can still be granted
-     */
-    public boolean isVoteRejected() {
-        return numGranted() + numUnrecorded() < majoritySize();
+    public boolean isBackoffComplete(long currentTimeMs) {
+        backoffTimer.update(currentTimeMs);
+        return backoffTimer.isExpired();
     }
 
-    /**
-     * Record a granted vote from one of the voters.
-     *
-     * @param remoteNodeId The id of the voter
-     * @return true if the voter had not been previously recorded
-     * @throws IllegalArgumentException if the remote node is not a voter or 
if the vote had already been
-     *         rejected by this node
-     */
+    public long remainingBackoffMs(long currentTimeMs) {
+        if (!isBackingOff) {
+            throw new IllegalStateException("Candidate is not currently 
backing off");
+        }
+        backoffTimer.update(currentTimeMs);
+        return backoffTimer.remainingMs();
+    }

Review Comment:
   Let's move this method so that it doesn't show in the diff.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -679,8 +706,26 @@ private void transitionToCandidate(long currentTimeMs) {
         onBecomeCandidate(currentTimeMs);
     }
 
+    private void onBecomeProspective(long currentTimeMs) {
+        ProspectiveState state = quorum.prospectiveStateOrThrow();
+        if (!maybeTransitionToCandidate(state, currentTimeMs)) {
+            resetConnections();
+            kafkaRaftMetrics.updateElectionStartMs(currentTimeMs);
+        }
+    }
+
+    private void transitionToProspective(long currentTimeMs) {
+        quorum.transitionToProspective();
+        maybeFireLeaderChange();
+        onBecomeProspective(currentTimeMs);
+    }
+
     private void transitionToUnattached(int epoch) {
-        quorum.transitionToUnattached(epoch);
+        transitionToUnattached(epoch, OptionalInt.empty());
+    }

Review Comment:
   Let's remove this and update the callers to use 
`transtitionToUnattached(int, OptionalInt)`.



##########
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##########
@@ -114,137 +99,58 @@ public boolean isBackingOff() {
         return isBackingOff;
     }
 
-    public int retries() {
-        return retries;
-    }
-
     /**
-     * Check whether we have received enough votes to conclude the election 
and become leader.
-     *
-     * @return true if at least a majority of nodes have granted the vote
+     * Record the current election has failed since we've either received 
sufficient rejecting voters or election timed out
      */
-    public boolean isVoteGranted() {
-        return numGranted() >= majoritySize();
+    public void startBackingOff(long currentTimeMs, long backoffDurationMs) {
+        this.backoffTimer.update(currentTimeMs);
+        this.backoffTimer.reset(backoffDurationMs);
+        this.isBackingOff = true;
     }
 
-    /**
-     * Check if we have received enough rejections that it is no longer 
possible to reach a
-     * majority of grants.
-     *
-     * @return true if the vote is rejected, false if the vote is already or 
can still be granted
-     */
-    public boolean isVoteRejected() {
-        return numGranted() + numUnrecorded() < majoritySize();
+    public boolean isBackoffComplete(long currentTimeMs) {
+        backoffTimer.update(currentTimeMs);
+        return backoffTimer.isExpired();

Review Comment:
   Let's move this method so that it doesn't show in the diff.



##########
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##########
@@ -85,26 +80,16 @@ protected CandidateState(
         this.backoffTimer = time.timer(0);
         this.log = logContext.logger(CandidateState.class);
 
-        for (ReplicaKey voter : voters.voterKeys()) {
-            voteStates.put(voter.id(), new VoterState(voter));
-        }
-        voteStates.get(localId).setState(State.GRANTED);
+        this.epochElection = new EpochElection(voters.voterKeys());
+        epochElection.recordVote(localId, true);
     }
 
     public int localId() {
         return localId;
     }
 
-    public int majoritySize() {
-        return voteStates.size() / 2 + 1;
-    }
-
-    private long numGranted() {
-        return votersInState(State.GRANTED).count();
-    }
-
-    private long numUnrecorded() {
-        return votersInState(State.UNRECORDED).count();
+    public int retries() {
+        return retries;

Review Comment:
   Let's move this method so that it doesn't show in the diff.



##########
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##########
@@ -114,137 +99,58 @@ public boolean isBackingOff() {
         return isBackingOff;
     }
 
-    public int retries() {
-        return retries;
-    }
-
     /**
-     * Check whether we have received enough votes to conclude the election 
and become leader.
-     *
-     * @return true if at least a majority of nodes have granted the vote
+     * Record the current election has failed since we've either received 
sufficient rejecting voters or election timed out
      */
-    public boolean isVoteGranted() {
-        return numGranted() >= majoritySize();
+    public void startBackingOff(long currentTimeMs, long backoffDurationMs) {
+        this.backoffTimer.update(currentTimeMs);
+        this.backoffTimer.reset(backoffDurationMs);
+        this.isBackingOff = true;

Review Comment:
   Let's move this method so that it doesn't show in the diff.



##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -118,6 +122,10 @@ public Node leaderNode(ListenerName listener) {
             );
     }
 
+    public Optional<ReplicaKey> votedKey() {
+        return votedKey;
+    }
+

Review Comment:
   I got the impression that you don't use this in `src/main`. If so, let's 
remove it.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -368,14 +369,26 @@ public void transitionToResigned(List<ReplicaKey> 
preferredSuccessors) {
     }
 
     /**
-     * Transition to the "unattached" state. This means we have found an epoch 
greater than the current epoch,
-     * but we do not yet know of the elected leader.
+     * Transition to the "unattached" state. This means we have found an epoch 
greater than the current epoch
+     * and do not yet know of the elected leader, or we have transitioned from 
Prospective with the same epoch.
+     * Note, if we are transitioning from unattached and there is no epoch 
change, we take the path of
+     * unattachedAddVotedState instead.
      */
+    // Used in testing
     public void transitionToUnattached(int epoch) {
+        transitionToUnattached(epoch, OptionalInt.empty());
+    }

Review Comment:
   Let's just remove this method and have the tests use 
`transitionToUnattached(int, OptionalInt)`.
   
   Also update the java doc to match the new signature and parameters.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -883,6 +928,13 @@ private boolean handleVoteResponse(
         VoteResponseData response = (VoteResponseData) responseMetadata.data();
         Errors topLevelError = Errors.forCode(response.errorCode());
         if (topLevelError != Errors.NONE) {
+            if (topLevelError == Errors.UNSUPPORTED_VERSION && 
quorum.isProspective()) {
+                logger.info("Prospective received unsupported version error in 
vote response in epoch {}, " +
+                        "transitioning to Candidate state immediately since 
entire quorum may not support PreVote.",
+                    quorum.epoch());
+                transitionToCandidate(currentTimeMs);
+                return true;
+            }
             return handleTopLevelError(topLevelError, responseMetadata);

Review Comment:
   How about:
   ```java
           if (topLevelError == Errors.UNSUPPORTED_VERSION && 
quorum.isProspective()) {
               logger.info(
                   "Prospective received unsupported version error in vote 
response in epoch {}, " +
                   "transitioning to Candidate state immediately since entire 
quorum may not support PreVote.",
                   quorum.epoch()
               );
               transitionToCandidate(currentTimeMs);
               return true;
           } else if (topLevelError != Errors.NONE) {
               return handleTopLevelError(topLevelError, responseMetadata);
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2442,10 +2503,10 @@ private void maybeTransition(
                 " and epoch " + epoch + " which is inconsistent with current 
leader " +
                 quorum.leaderId() + " and epoch " + quorum.epoch());
         } else if (epoch > quorum.epoch()) {
-            if (leaderId.isPresent()) {
+            if (leaderId.isPresent() && !leaderEndpoints.isEmpty()) {

Review Comment:
   I think this is too relax. The previous code assumed that if 
`leaderId.isPresent()` then `!leaderEndpoints.isEmpty()`.  That should not 
change in this PR.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -957,13 +998,33 @@ private boolean handleVoteResponse(
         }
     }
 
+    private void maybeCandidateStartBackingOff(long currentTimeMs) {
+        // If in candidate state and vote is rejected, go immediately to a 
random, exponential backoff. The
+        // backoff starts low to prevent needing to wait the entire election 
timeout when the vote
+        // result has already been determined. The randomness prevents the 
next election from being
+        // gridlocked with another nominee due to timing. The exponential 
aspect limits epoch churn when
+        // the replica has failed multiple elections in succession.
+        if (quorum.isCandidate()) {
+            CandidateState candidate = quorum.candidateStateOrThrow();
+            if (candidate.epochElection().isVoteRejected() && 
!candidate.isBackingOff()) {
+                logger.info("Insufficient remaining votes to become leader 
(rejected by {}). " +
+                    "We will backoff before retrying election again", 
candidate.epochElection().rejectingVoters());

Review Comment:
   Let's fix this formatting. See my under examples on how we try to format 
code in the raft module.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -957,13 +998,33 @@ private boolean handleVoteResponse(
         }
     }
 
+    private void maybeCandidateStartBackingOff(long currentTimeMs) {
+        // If in candidate state and vote is rejected, go immediately to a 
random, exponential backoff. The
+        // backoff starts low to prevent needing to wait the entire election 
timeout when the vote
+        // result has already been determined. The randomness prevents the 
next election from being
+        // gridlocked with another nominee due to timing. The exponential 
aspect limits epoch churn when
+        // the replica has failed multiple elections in succession.
+        if (quorum.isCandidate()) {
+            CandidateState candidate = quorum.candidateStateOrThrow();
+            if (candidate.epochElection().isVoteRejected() && 
!candidate.isBackingOff()) {
+                logger.info("Insufficient remaining votes to become leader 
(rejected by {}). " +
+                    "We will backoff before retrying election again", 
candidate.epochElection().rejectingVoters());
+
+                candidate.startBackingOff(
+                    currentTimeMs,
+                    binaryExponentialElectionBackoffMs(candidate.retries())
+                );
+            }
+        }
+    }
+
     private int binaryExponentialElectionBackoffMs(int retries) {
         if (retries <= 0) {
             throw new IllegalArgumentException("Retries " + retries + " should 
be larger than zero");
         }
         // upper limit exponential co-efficients at 20 to avoid overflow
         return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 << 
Math.min(20, retries - 1)),
-                quorumConfig.electionBackoffMaxMs());
+            quorumConfig.electionBackoffMaxMs());

Review Comment:
   Same here. Let's fix the indentation since you are already changing this 
part of the code.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -926,29 +978,18 @@ private boolean handleVoteResponse(
             if (quorum.isLeader()) {
                 logger.debug("Ignoring vote response {} since we already 
became leader for epoch {}",
                     partitionResponse, quorum.epoch());
-            } else if (quorum.isCandidate()) {
-                CandidateState state = quorum.candidateStateOrThrow();
+            } else if (quorum.isNomineeState()) {
+                NomineeState state = quorum.nomineeStateOrThrow();
                 if (partitionResponse.voteGranted()) {
                     state.recordGrantedVote(remoteNodeId);
-                    maybeTransitionToLeader(state, currentTimeMs);
+                    maybeTransitionForward(state, currentTimeMs);
                 } else {
                     state.recordRejectedVote(remoteNodeId);
-
-                    // If our vote is rejected, we go immediately to the 
random backoff. This
-                    // ensures that we are not stuck waiting for the election 
timeout when the
-                    // vote has become gridlocked.
-                    if (state.isVoteRejected() && !state.isBackingOff()) {
-                        logger.info("Insufficient remaining votes to become 
leader (rejected by {}). " +
-                            "We will backoff before retrying election again", 
state.rejectingVoters());
-
-                        state.startBackingOff(
-                            currentTimeMs,
-                            binaryExponentialElectionBackoffMs(state.retries())
-                        );
-                    }
+                    maybeCandidateStartBackingOff(currentTimeMs);
                 }
             } else {
-                logger.debug("Ignoring vote response {} since we are no longer 
a candidate in epoch {}",
+                logger.debug("Ignoring vote response {} since we are no longer 
a VotingState " +
+                        "(Prospective or Candidate) in epoch {}",
                     partitionResponse, quorum.epoch());

Review Comment:
   Let's fix the indentation. In the raft module we using this formatting style:
   ```java
                   logger.debug(
                       "Ignoring vote response {} since we are no longer a 
VotingState " +
                       "(Prospective or Candidate) in epoch {}",
                       partitionResponse,
                       quorum.epoch()
                   );
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2442,10 +2503,10 @@ private void maybeTransition(
                 " and epoch " + epoch + " which is inconsistent with current 
leader " +
                 quorum.leaderId() + " and epoch " + quorum.epoch());
         } else if (epoch > quorum.epoch()) {
-            if (leaderId.isPresent()) {
+            if (leaderId.isPresent() && !leaderEndpoints.isEmpty()) {
                 transitionToFollower(epoch, leaderId.getAsInt(), 
leaderEndpoints, currentTimeMs);
             } else {
-                transitionToUnattached(epoch);
+                transitionToUnattached(epoch, leaderId);

Review Comment:
   When would raft hit this case?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2760,7 +2822,7 @@ private VoteRequestData buildVoteRequest(ReplicaKey 
remoteVoter) {
             remoteVoter,
             endOffset.epoch(),
             endOffset.offset(),
-            false
+            isPreVote

Review Comment:
   Minor but you can just inline the expression: `quorum.isProspective()`.
   
   Also, the call site that calls this knows the `NomineeState` so it can use 
that information to determine if the request is a preVote request. No need to 
query `QuorumState` for this information.
   
   For example, you can add `isPreVote` to the `NomineeState` interface and 
update the signature of this method to `buildVoteRequest(ReplicaKey, boolean)`.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -385,15 +398,17 @@ public void transitionToUnattached(int epoch) {
             electionTimeoutMs = 
candidateStateOrThrow().remainingElectionTimeMs(time.milliseconds());
         } else if (isUnattached()) {
             electionTimeoutMs = 
unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds());
+        } else if (isProspective() && 
!prospectiveStateOrThrow().epochElection().isVoteRejected()) {
+            electionTimeoutMs = 
prospectiveStateOrThrow().remainingElectionTimeMs(time.milliseconds());
         } else {
             electionTimeoutMs = randomElectionTimeoutMs();
         }
 
         durableTransitionTo(new UnattachedState(
             time,
             epoch,
-            OptionalInt.empty(),
-            Optional.empty(),
+            leaderId,
+            epoch == currentEpoch ? votedKey() : Optional.empty(),

Review Comment:
   Let's write a comment explaining why this expression is needed.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3026,20 +3088,46 @@ private long pollCandidate(long currentTimeMs) {
             return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
         } else if (state.isBackingOff()) {
             if (state.isBackoffComplete(currentTimeMs)) {
-                logger.info("Re-elect as candidate after election backoff has 
completed");
-                transitionToCandidate(currentTimeMs);
+                logger.info("Transition to prospective after election backoff 
has completed");
+                transitionToProspective(currentTimeMs);
                 return 0L;
             }
             return state.remainingBackoffMs(currentTimeMs);
         } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
-            long backoffDurationMs = 
binaryExponentialElectionBackoffMs(state.retries());
-            logger.info("Election has timed out, backing off for {}ms before 
becoming a candidate again",
-                backoffDurationMs);
-            state.startBackingOff(currentTimeMs, backoffDurationMs);
-            return backoffDurationMs;
+            logger.info("Election was not granted, transitioning to 
prospective");
+            transitionToProspective(currentTimeMs);
+            return 0L;
         } else {
+            long minVoteRequestBackoffMs = maybeSendVoteRequests(state, 
currentTimeMs);
+            return Math.min(minVoteRequestBackoffMs, 
state.remainingElectionTimeMs(currentTimeMs));
+        }
+    }
+
+    private long pollProspective(long currentTimeMs) {
+        ProspectiveState state = quorum.prospectiveStateOrThrow();
+        GracefulShutdown shutdown = this.shutdown.get();
+
+        if (shutdown != null) {
             long minRequestBackoffMs = maybeSendVoteRequests(state, 
currentTimeMs);
-            return Math.min(minRequestBackoffMs, 
state.remainingElectionTimeMs(currentTimeMs));
+            return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
+        } else if (state.epochElection().isVoteRejected() || 
state.hasElectionTimeoutExpired(currentTimeMs)) {

Review Comment:
   This code shouldn't check `isVoteRejected` if it is handled in 
`handleVoteRespose`. I left a comment about this in that method.
   
   This is an event driven programming model The event that cause the majority 
of the voters to reject the prospective state is received in the vote response. 
And not when polling the prospective state.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -926,29 +978,18 @@ private boolean handleVoteResponse(
             if (quorum.isLeader()) {
                 logger.debug("Ignoring vote response {} since we already 
became leader for epoch {}",
                     partitionResponse, quorum.epoch());
-            } else if (quorum.isCandidate()) {
-                CandidateState state = quorum.candidateStateOrThrow();
+            } else if (quorum.isNomineeState()) {
+                NomineeState state = quorum.nomineeStateOrThrow();
                 if (partitionResponse.voteGranted()) {
                     state.recordGrantedVote(remoteNodeId);
-                    maybeTransitionToLeader(state, currentTimeMs);
+                    maybeTransitionForward(state, currentTimeMs);
                 } else {
                     state.recordRejectedVote(remoteNodeId);
-
-                    // If our vote is rejected, we go immediately to the 
random backoff. This
-                    // ensures that we are not stuck waiting for the election 
timeout when the
-                    // vote has become gridlocked.
-                    if (state.isVoteRejected() && !state.isBackingOff()) {
-                        logger.info("Insufficient remaining votes to become 
leader (rejected by {}). " +
-                            "We will backoff before retrying election again", 
state.rejectingVoters());
-
-                        state.startBackingOff(
-                            currentTimeMs,
-                            binaryExponentialElectionBackoffMs(state.retries())
-                        );
-                    }
+                    maybeCandidateStartBackingOff(currentTimeMs);

Review Comment:
   This handle when the majority of the voters rejected the candidate. This 
also needs to also handle when the majority of the voters reject the 
prospective candidate.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) {
     }
 
     /**
-     * Grant a vote to a candidate. We will transition/remain in Unattached
-     * state until either the election timeout expires or a leader is elected. 
In particular,
-     * we do not begin fetching until the election has concluded and
-     * {@link #transitionToFollower(int, int, Endpoints)} is invoked.
+     * Grant a vote to a candidate as Unattached. We will transition to 
Unattached with votedKey
+     * state and remain there until either the election timeout expires or we 
discover the leader.
      */
-    public void transitionToUnattachedVotedState(
+    public void unattachedAddVotedState(
         int epoch,
         ReplicaKey candidateKey
     ) {
         int currentEpoch = state.epoch();
         if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Voted for %s and epoch %d since it 
matches the local " +
+                    "Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
                     "broker.id",
                     candidateKey,
+                    state,
                     epoch
                 )
             );
         } else if (localId.isEmpty()) {
-            throw new IllegalStateException("Cannot transition to voted 
without a replica id");
-        } else if (epoch < currentEpoch) {
+            throw new IllegalStateException("Cannot add voted state without a 
replica id");
+        } else if (epoch != currentEpoch || !isUnattachedNotVoted()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Voted for %s and epoch %d since the 
current epoch " +
-                    "(%d) is larger",
+                    "Cannot add voted key (%s) to current state (%s) in epoch 
%d",
                     candidateKey,
-                    epoch,
-                    currentEpoch
+                    state,
+                    epoch
                 )
             );
-        } else if (epoch == currentEpoch && !isUnattachedNotVoted()) {
+        }
+
+        // Note that we reset the election timeout after voting for a 
candidate because we
+        // know that the candidate has at least as good of a chance of getting 
elected as us
+        durableTransitionTo(
+            new UnattachedState(
+                time,
+                epoch,
+                state.election().optionalLeaderId(),
+                Optional.of(candidateKey),
+                partitionState.lastVoterSet().voterIds(),
+                state.highWatermark(),
+                randomElectionTimeoutMs(),
+                logContext
+            )
+        );
+        log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch);
+    }
+
+    /**
+     * Grant a vote to a candidate as Prospective. We will transition to 
Prospective with votedKey
+     * state and remain there until either the election timeout expires or we 
discover the leader.
+     */
+    public void prospectiveAddVotedState(
+        int epoch,
+        ReplicaKey candidateKey
+    ) {
+        int currentEpoch = state.epoch();
+        if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Voted for %s and epoch %d from the 
current state (%s)",
+                    "Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
+                        "broker.id",

Review Comment:
   Let's fix this formatting.
   ```java
                       "Cannot add voted key (%s) to current state (%s) in 
epoch %d since it matches the local " +
                       "broker.id",
   ```



##########
raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.internals.EpochElection;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.raft.QuorumState.unattachedOrProspectiveCanGrantVote;
+
+public class ProspectiveState implements NomineeState {
+    private final int localId;
+    private final int epoch;
+    private final OptionalInt leaderId;
+    private final Optional<Endpoints> leaderEndpoints;
+    private final Optional<ReplicaKey> votedKey;
+    private final VoterSet voters;
+    private final EpochElection epochElection;
+    private final Optional<LogOffsetMetadata> highWatermark;
+    private final int retries;
+    private final long electionTimeoutMs;
+    private final Timer electionTimer;
+    private final Logger log;
+
+    /**
+     * The lifetime of a prospective state is the following.
+     *
+     * 1. Once started, it will send prevote requests and keep record of the 
received vote responses
+     * 2. If it receives a message denoting a leader with a higher epoch, it 
will transition to follower state.
+     * 3. If majority votes granted, it will transition to candidate state.
+     * 4. If majority votes rejected or election times out, it will transition 
to unattached or follower state
+     *    depending on if it knows the leader id and endpoints or not
+     */
+    public ProspectiveState(
+        Time time,
+        int localId,
+        int epoch,
+        OptionalInt leaderId,
+        Optional<Endpoints> leaderEndpoints,
+        Optional<ReplicaKey> votedKey,
+        VoterSet voters,
+        Optional<LogOffsetMetadata> highWatermark,
+        int retries,
+        int electionTimeoutMs,
+        LogContext logContext
+    ) {
+        this.localId = localId;
+        this.epoch = epoch;
+        this.leaderId = leaderId;
+        this.leaderEndpoints = leaderEndpoints;
+        this.votedKey = votedKey;
+        this.voters = voters;
+        this.highWatermark = highWatermark;
+        this.retries = retries;
+        this.electionTimeoutMs = electionTimeoutMs;
+        this.electionTimer = time.timer(electionTimeoutMs);
+        this.log = logContext.logger(ProspectiveState.class);
+
+        this.epochElection = new EpochElection(voters.voterKeys());
+        epochElection.recordVote(localId, true);
+    }
+
+    public int localId() {
+        return localId;
+    }
+
+    public Optional<ReplicaKey> votedKey() {
+        return votedKey;
+    }
+
+    @Override
+    public EpochElection epochElection() {
+        return epochElection;
+    }
+
+    public int retries() {
+        return retries;
+    }

Review Comment:
   I see. Retries is only preserved when the prospective transition from 
prospective to candidate. The retries are lost if it transitions to unattached.
   
   I think we should file a jira to remove this exponential backoff. I am 
convinced that it is starting to lose its value with this implementation and if 
we make the election timeout improvements you highlight in the KIP



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -529,26 +589,47 @@ public void transitionToFollower(int epoch, int leaderId, 
Endpoints endpoints) {
         );
     }
 
-    public void transitionToCandidate() {
+    public void transitionToProspective() {
         if (isObserver()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Candidate since the local id (%s) 
and directory id (%s) " +
-                    "is not one of the voters %s",
+                    "Cannot transition to Prospective since the local id (%s) 
and directory id (%s) " +
+                        "is not one of the voters %s",
                     localId,
                     localDirectoryId,
                     partitionState.lastVoterSet()
                 )
             );
-        } else if (isLeader()) {
-            throw new IllegalStateException("Cannot transition to Candidate 
since the local broker.id=" + localId +
-                " since this node is already a Leader with state " + state);
+        } else if (isLeader() || isProspective()) {
+            throw new IllegalStateException("Cannot transition to Prospective 
since the local broker.id=" + localId +
+                " is state " + state);
         }
 
         int retries = isCandidate() ? candidateStateOrThrow().retries() + 1 : 
1;
+
+        durableTransitionTo(new ProspectiveState(
+            time,
+            localIdOrThrow(),
+            epoch(),
+            leaderId(),
+            Optional.of(state.leaderEndpoints()),
+            votedKey(),
+            partitionState.lastVoterSet(),
+            state.highWatermark(),
+            retries,
+            randomElectionTimeoutMs(),
+            logContext
+        ));

Review Comment:
   How about this formatting:
   ```java
           durableTransitionTo(
               new ProspectiveState(
                   time,
                   ...
               )
           );
   ```



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) {
     }
 
     /**
-     * Grant a vote to a candidate. We will transition/remain in Unattached
-     * state until either the election timeout expires or a leader is elected. 
In particular,
-     * we do not begin fetching until the election has concluded and
-     * {@link #transitionToFollower(int, int, Endpoints)} is invoked.
+     * Grant a vote to a candidate as Unattached. We will transition to 
Unattached with votedKey
+     * state and remain there until either the election timeout expires or we 
discover the leader.
      */
-    public void transitionToUnattachedVotedState(
+    public void unattachedAddVotedState(
         int epoch,
         ReplicaKey candidateKey
     ) {
         int currentEpoch = state.epoch();
         if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Voted for %s and epoch %d since it 
matches the local " +
+                    "Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
                     "broker.id",
                     candidateKey,
+                    state,
                     epoch
                 )
             );
         } else if (localId.isEmpty()) {
-            throw new IllegalStateException("Cannot transition to voted 
without a replica id");
-        } else if (epoch < currentEpoch) {
+            throw new IllegalStateException("Cannot add voted state without a 
replica id");
+        } else if (epoch != currentEpoch || !isUnattachedNotVoted()) {

Review Comment:
   Should this check that the epoch is not decreasing?



##########
raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.internals.EpochElection;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.raft.QuorumState.unattachedOrProspectiveCanGrantVote;
+
+public class ProspectiveState implements NomineeState {
+    private final int localId;
+    private final int epoch;
+    private final OptionalInt leaderId;
+    private final Optional<Endpoints> leaderEndpoints;
+    private final Optional<ReplicaKey> votedKey;
+    private final VoterSet voters;
+    private final EpochElection epochElection;
+    private final Optional<LogOffsetMetadata> highWatermark;
+    private final int retries;
+    private final long electionTimeoutMs;
+    private final Timer electionTimer;
+    private final Logger log;
+
+    /**
+     * The lifetime of a prospective state is the following.
+     *
+     * 1. Once started, it will send prevote requests and keep record of the 
received vote responses
+     * 2. If it receives a message denoting a leader with a higher epoch, it 
will transition to follower state.
+     * 3. If majority votes granted, it will transition to candidate state.
+     * 4. If majority votes rejected or election times out, it will transition 
to unattached or follower state
+     *    depending on if it knows the leader id and endpoints or not
+     */
+    public ProspectiveState(
+        Time time,
+        int localId,
+        int epoch,
+        OptionalInt leaderId,
+        Optional<Endpoints> leaderEndpoints,
+        Optional<ReplicaKey> votedKey,
+        VoterSet voters,
+        Optional<LogOffsetMetadata> highWatermark,
+        int retries,
+        int electionTimeoutMs,
+        LogContext logContext
+    ) {
+        this.localId = localId;
+        this.epoch = epoch;
+        this.leaderId = leaderId;
+        this.leaderEndpoints = leaderEndpoints;
+        this.votedKey = votedKey;
+        this.voters = voters;
+        this.highWatermark = highWatermark;
+        this.retries = retries;
+        this.electionTimeoutMs = electionTimeoutMs;
+        this.electionTimer = time.timer(electionTimeoutMs);
+        this.log = logContext.logger(ProspectiveState.class);
+
+        this.epochElection = new EpochElection(voters.voterKeys());
+        epochElection.recordVote(localId, true);
+    }
+
+    public int localId() {
+        return localId;
+    }
+
+    public Optional<ReplicaKey> votedKey() {
+        return votedKey;
+    }
+
+    @Override
+    public EpochElection epochElection() {
+        return epochElection;
+    }
+
+    public int retries() {
+        return retries;
+    }
+
+    @Override
+    public boolean recordGrantedVote(int remoteNodeId) {
+        return epochElection().recordVote(remoteNodeId, true);
+    }
+
+    @Override
+    public boolean recordRejectedVote(int remoteNodeId) {
+        if (remoteNodeId == localId) {
+            throw new IllegalArgumentException("Attempted to reject vote from 
ourselves");
+        }
+        return epochElection().recordVote(remoteNodeId, false);
+    }
+
+    @Override
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
+        return unattachedOrProspectiveCanGrantVote(
+            leaderId,
+            votedKey,
+            epoch,
+            replicaKey,
+            isLogUpToDate,
+            isPreVote,
+            log
+        );
+    }
+
+    @Override
+    public boolean hasElectionTimeoutExpired(long currentTimeMs) {
+        electionTimer.update(currentTimeMs);
+        return electionTimer.isExpired();
+    }
+
+    @Override
+    public long remainingElectionTimeMs(long currentTimeMs) {
+        electionTimer.update(currentTimeMs);
+        return electionTimer.remainingMs();
+    }
+
+    @Override
+    public ElectionState election() {
+        return new ElectionState(epoch, leaderId, votedKey, voters.voterIds());
+    }
+
+    @Override
+    public int epoch() {
+        return epoch;
+    }
+
+    @Override
+    public Endpoints leaderEndpoints() {
+        return leaderEndpoints.orElse(Endpoints.empty());
+    }
+
+    @Override
+    public Optional<LogOffsetMetadata> highWatermark() {
+        return highWatermark;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+            "Prospective(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, 
electionTimeoutMs=%s, highWatermark=%s)",

Review Comment:
   Let's also include the epoch election and add a `toString` method to the 
`EpochElection` type.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -193,17 +200,6 @@ public void initialize(OffsetAndEpoch 
logEndOffsetAndEpoch) throws IllegalStateE
                 randomElectionTimeoutMs(),
                 logContext
             );
-        } else if (election.hasVoted()) {
-            initialState = new UnattachedState(
-                time,
-                election.epoch(),
-                OptionalInt.empty(),
-                Optional.of(election.votedKey()),
-                partitionState.lastVoterSet().voterIds(),
-                Optional.empty(),
-                randomElectionTimeoutMs(),
-                logContext
-            );

Review Comment:
   Nice code removal! Thanks.



##########
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ReplicaKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ *  Tracks the votes cast by voters in an election held by a Nominee.
+ */
+public class EpochElection {
+    private Map<Integer, VoterState> voterStates;
+
+    public EpochElection(Set<ReplicaKey> voters) {
+        this.voterStates = voters.stream()
+            .collect(Collectors.toMap(
+                ReplicaKey::id,
+                VoterState::new
+            ));

Review Comment:
   ```java
               .collect(
                   Collectors.toMap(
                       ReplicaKey::id,
                       VoterState::new
                   )
               );
   ```



##########
raft/src/main/java/org/apache/kafka/raft/UnattachedState.java:
##########
@@ -120,60 +116,28 @@ public Optional<LogOffsetMetadata> highWatermark() {
 
     @Override
     public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
-        if (isPreVote) {
-            return canGrantPreVote(replicaKey, isLogUpToDate);
-        } else if (votedKey.isPresent()) {
-            ReplicaKey votedReplicaKey = votedKey.get();
-            if (votedReplicaKey.id() == replicaKey.id()) {
-                return votedReplicaKey.directoryId().isEmpty() || 
votedReplicaKey.directoryId().equals(replicaKey.directoryId());
-            }
-            log.debug(
-                "Rejecting Vote request (preVote=false) from candidate ({}), 
already have voted for another " +
-                    "candidate ({}) in epoch {}",
-                replicaKey,
-                votedKey,
-                epoch
-            );
-            return false;
-        } else if (leaderId.isPresent()) {
-            // If the leader id is known it should behave similar to the 
follower state
-            log.debug(
-                "Rejecting Vote request (preVote=false) from candidate ({}) 
since we already have a leader {} in epoch {}",
-                replicaKey,
-                leaderId,
-                epoch
-            );
-            return false;
-        } else if (!isLogUpToDate) {
-            log.debug(
-                "Rejecting Vote request (preVote=false) from candidate ({}) 
since candidate epoch/offset is not up to date with us",
-                replicaKey
-            );
-        }
-
-        return isLogUpToDate;
-    }
-
-    private boolean canGrantPreVote(ReplicaKey replicaKey, boolean 
isLogUpToDate) {
-        if (!isLogUpToDate) {
-            log.debug(
-                "Rejecting Vote request (preVote=true) from replica ({}) since 
replica's log is not up to date with us",
-                replicaKey
-            );
-        }
-
-        return isLogUpToDate;
+        return unattachedOrProspectiveCanGrantVote(
+            leaderId,
+            votedKey,
+            epoch,
+            replicaKey,
+            isLogUpToDate,
+            isPreVote,
+            log
+        );
     }
 
     @Override
     public String toString() {
-        return "Unattached(" +
-            "epoch=" + epoch +
-            ", votedKey=" + votedKey.map(ReplicaKey::toString).orElse("null") +
-            ", voters=" + voters +
-            ", electionTimeoutMs=" + electionTimeoutMs +
-            ", highWatermark=" + highWatermark +
-            ')';
+        return String.format("Unattached(epoch=%d, leaderId=%s, votedKey=%s, 
voters=%s, " +
+            "electionTimeoutMs=%d, highWatermark=%s)",

Review Comment:
   ```java
           return String.format(
               "Unattached(epoch=%d, leaderId=%s, votedKey=%s, voters=%s, " +
               "electionTimeoutMs=%d, highWatermark=%s)",
   ```



##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java:
##########
@@ -71,10 +71,14 @@ public KafkaRaftMetrics(Metrics metrics, String 
metricGrpPrefix, QuorumState sta
         Gauge<String> stateProvider = (mConfig, currentTimeMs) -> {
             if (state.isLeader()) {
                 return "leader";
+            } else if (state.isProspectiveNotVoted()) {
+                return "prospective";
+            } else if (state.isProspectiveAndVoted()) {
+                return "prospective-voted";
             } else if (state.isCandidate()) {
                 return "candidate";
             } else if (state.isUnattachedAndVoted()) {
-                return "voted";
+                return "unattached-voted";

Review Comment:
   This is a publicly visible change. Let's update the KIP if it doesn't 
include this change.



##########
raft/src/main/java/org/apache/kafka/raft/UnattachedState.java:
##########
@@ -71,13 +73,7 @@ public UnattachedState(
 
     @Override
     public ElectionState election() {
-        if (votedKey.isPresent()) {
-            return ElectionState.withVotedCandidate(epoch, votedKey().get(), 
voters);
-        } else if (leaderId.isPresent()) {
-            return ElectionState.withElectedLeader(epoch, leaderId.getAsInt(), 
voters);
-        } else {
-            return ElectionState.withUnknownLeader(epoch, voters);
-        }
+        return new ElectionState(epoch, leaderId, votedKey, voters);

Review Comment:
   Let's undo this change. It is good to keep the existing invariant to avoid 
persisting both `leaderId` and `votedKey`.



##########
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ReplicaKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ *  Tracks the votes cast by voters in an election held by a Nominee.
+ */
+public class EpochElection {
+    private Map<Integer, VoterState> voterStates;
+
+    public EpochElection(Set<ReplicaKey> voters) {
+        this.voterStates = voters.stream()
+            .collect(Collectors.toMap(
+                ReplicaKey::id,
+                VoterState::new
+            ));
+    }
+
+    /**
+     * Record a vote from a voter.
+     * @param voterId The id of the voter
+     * @param isGranted true if the vote is granted, false if it is rejected
+     * @return true if the voter had not been previously recorded
+     */
+    public boolean recordVote(int voterId, boolean isGranted) {
+        boolean wasUnrecorded = false;
+        VoterState voterState = getVoterStateOrThrow(voterId);
+        if (voterState.state == VoterState.State.UNRECORDED) {
+            wasUnrecorded = true;
+        }
+        if (isGranted) {
+            voterState.setState(VoterState.State.GRANTED);
+        } else {
+            voterState.setState(VoterState.State.REJECTED);
+        }
+        return wasUnrecorded;
+    }
+
+    /**
+     * Returns if a voter has granted the vote.
+     * @param voterId The id of the voter
+     * @throws IllegalArgumentException if the voter is not in the set of 
voters
+     */
+    public boolean isGrantedVoter(int voterId) {
+        return getVoterStateOrThrow(voterId).state == VoterState.State.GRANTED;
+    }
+
+    /**
+     * Returns if a voter has rejected the vote.
+     * @param voterId The id of the voter
+     * @throws IllegalArgumentException if the voter is not in the set of 
voters
+     */
+    public boolean isRejectedVoter(int voterId) {
+        return getVoterStateOrThrow(voterId).state == 
VoterState.State.REJECTED;
+    }
+
+    /**
+     * The set of voter ids.
+     */
+    public Set<Integer> voterIds() {
+        return Collections.unmodifiableSet(voterStates.keySet());
+    }
+
+    /**
+     * Get the collection of voter states.
+     */
+    public Collection<VoterState> voterStates() {
+        return Collections.unmodifiableCollection(voterStates.values());
+    }
+
+    /**
+     * Check whether we have received enough votes to conclude the election 
and become leader.
+     *
+     * @return true if at least a majority of nodes have granted the vote
+     */
+    public boolean isVoteGranted() {
+        return numGranted() >= majoritySize();
+    }
+
+    /**
+     * Check if we have received enough rejections that it is no longer 
possible to reach a
+     * majority of grants.
+     *
+     * @return true if the vote is rejected, false if the vote is already or 
can still be granted
+     */
+    public boolean isVoteRejected() {
+        return numGranted() + numUnrecorded() < majoritySize();
+    }
+
+    /**
+     * Get the set of voters which have not been counted as granted or 
rejected yet.
+     *
+     * @return The set of unrecorded voters
+     */
+    public Set<ReplicaKey> unrecordedVoters() {
+        return 
votersOfState(VoterState.State.UNRECORDED).collect(Collectors.toSet());
+    }
+
+    /**
+     * Get the set of voters that have granted our vote requests.
+     *
+     * @return The set of granting voters, which should always contain the 
localId
+     */
+    public Set<Integer> grantingVoters() {
+        return 
votersOfState(VoterState.State.GRANTED).map(ReplicaKey::id).collect(Collectors.toSet());
+    }
+
+    /**
+     * Get the set of voters that have rejected our candidacy.
+     *
+     * @return The set of rejecting voters
+     */
+    public Set<Integer> rejectingVoters() {
+        return 
votersOfState(VoterState.State.REJECTED).map(ReplicaKey::id).collect(Collectors.toSet());
+    }
+
+    private VoterState getVoterStateOrThrow(int voterId) {
+        VoterState voterState = voterStates.get(voterId);
+        if (voterState == null) {
+            throw new IllegalArgumentException("Attempt to access voter state 
of non-voter " + voterId);
+        }
+        return voterState;
+    }
+
+    private Stream<ReplicaKey> votersOfState(VoterState.State state) {
+        return voterStates
+            .values()
+            .stream()
+            .filter(voterState -> voterState.state().equals(state))
+            .map(VoterState::replicaKey);
+    }
+
+    private long numGranted() {
+        return votersOfState(VoterState.State.GRANTED).count();
+    }
+
+    private long numUnrecorded() {
+        return votersOfState(VoterState.State.UNRECORDED).count();
+    }
+
+    private int majoritySize() {
+        return voterStates.size() / 2 + 1;
+    }
+
+    private static final class VoterState {

Review Comment:
   Let's add a `toString` method to this type so that its value is included in 
log messages.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -957,13 +998,33 @@ private boolean handleVoteResponse(
         }
     }
 
+    private void maybeCandidateStartBackingOff(long currentTimeMs) {

Review Comment:
   This code only have one caller. How about just manually inline it at the 
call site.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) {
     }
 
     /**
-     * Grant a vote to a candidate. We will transition/remain in Unattached
-     * state until either the election timeout expires or a leader is elected. 
In particular,
-     * we do not begin fetching until the election has concluded and
-     * {@link #transitionToFollower(int, int, Endpoints)} is invoked.
+     * Grant a vote to a candidate as Unattached. We will transition to 
Unattached with votedKey
+     * state and remain there until either the election timeout expires or we 
discover the leader.
      */
-    public void transitionToUnattachedVotedState(
+    public void unattachedAddVotedState(
         int epoch,
         ReplicaKey candidateKey
     ) {
         int currentEpoch = state.epoch();
         if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Voted for %s and epoch %d since it 
matches the local " +
+                    "Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
                     "broker.id",
                     candidateKey,
+                    state,
                     epoch
                 )
             );
         } else if (localId.isEmpty()) {
-            throw new IllegalStateException("Cannot transition to voted 
without a replica id");
-        } else if (epoch < currentEpoch) {
+            throw new IllegalStateException("Cannot add voted state without a 
replica id");
+        } else if (epoch != currentEpoch || !isUnattachedNotVoted()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Voted for %s and epoch %d since the 
current epoch " +
-                    "(%d) is larger",
+                    "Cannot add voted key (%s) to current state (%s) in epoch 
%d",
                     candidateKey,
-                    epoch,
-                    currentEpoch
+                    state,
+                    epoch
                 )
             );
-        } else if (epoch == currentEpoch && !isUnattachedNotVoted()) {
+        }
+
+        // Note that we reset the election timeout after voting for a 
candidate because we
+        // know that the candidate has at least as good of a chance of getting 
elected as us
+        durableTransitionTo(
+            new UnattachedState(
+                time,
+                epoch,
+                state.election().optionalLeaderId(),
+                Optional.of(candidateKey),
+                partitionState.lastVoterSet().voterIds(),
+                state.highWatermark(),
+                randomElectionTimeoutMs(),
+                logContext
+            )
+        );
+        log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch);
+    }
+
+    /**
+     * Grant a vote to a candidate as Prospective. We will transition to 
Prospective with votedKey
+     * state and remain there until either the election timeout expires or we 
discover the leader.
+     */
+    public void prospectiveAddVotedState(
+        int epoch,
+        ReplicaKey candidateKey
+    ) {
+        int currentEpoch = state.epoch();
+        if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Voted for %s and epoch %d from the 
current state (%s)",
+                    "Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
+                        "broker.id",
                     candidateKey,
-                    epoch,
-                    state
+                    state,
+                    epoch
+                )
+            );
+        } else if (localId.isEmpty()) {
+            throw new IllegalStateException("Cannot add voted state without a 
replica id");
+        } else if (epoch != currentEpoch || !isProspectiveNotVoted()) {

Review Comment:
   This should check that the epoch is not decreasing.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -529,26 +589,47 @@ public void transitionToFollower(int epoch, int leaderId, 
Endpoints endpoints) {
         );
     }
 
-    public void transitionToCandidate() {
+    public void transitionToProspective() {
         if (isObserver()) {
             throw new IllegalStateException(
                 String.format(
-                    "Cannot transition to Candidate since the local id (%s) 
and directory id (%s) " +
-                    "is not one of the voters %s",
+                    "Cannot transition to Prospective since the local id (%s) 
and directory id (%s) " +
+                        "is not one of the voters %s",

Review Comment:
   Fix formatting.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -718,7 +849,64 @@ public boolean isResigned() {
         return state instanceof ResignedState;
     }
 
+    public boolean isProspective() {
+        return state instanceof ProspectiveState;
+    }
+
     public boolean isCandidate() {
         return state instanceof CandidateState;
     }
+
+    public boolean isNomineeState() {
+        return state instanceof NomineeState;
+    }
+
+    public static boolean unattachedOrProspectiveCanGrantVote(

Review Comment:
   Let's add a java doc comment to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to