ahuang98 commented on code in PR #18240: URL: https://github.com/apache/kafka/pull/18240#discussion_r1899727021
########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) { } /** - * Grant a vote to a candidate. We will transition/remain in Unattached - * state until either the election timeout expires or a leader is elected. In particular, - * we do not begin fetching until the election has concluded and - * {@link #transitionToFollower(int, int, Endpoints)} is invoked. + * Grant a vote to a candidate as Unattached. We will transition to Unattached with votedKey + * state and remain there until either the election timeout expires or we discover the leader. */ - public void transitionToUnattachedVotedState( + public void unattachedAddVotedState( int epoch, ReplicaKey candidateKey ) { int currentEpoch = state.epoch(); if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d since it matches the local " + + "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + "broker.id", candidateKey, + state, epoch ) ); } else if (localId.isEmpty()) { - throw new IllegalStateException("Cannot transition to voted without a replica id"); - } else if (epoch < currentEpoch) { + throw new IllegalStateException("Cannot add voted state without a replica id"); + } else if (epoch != currentEpoch || !isUnattachedNotVoted()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d since the current epoch " + - "(%d) is larger", + "Cannot add voted key (%s) to current state (%s) in epoch %d", candidateKey, - epoch, - currentEpoch + state, + epoch ) ); - } else if (epoch == currentEpoch && !isUnattachedNotVoted()) { + } + + // Note that we reset the election timeout after voting for a candidate because we + // know that the candidate has at least as good of a chance of getting elected as us + durableTransitionTo( + new UnattachedState( + time, + epoch, + state.election().optionalLeaderId(), + Optional.of(candidateKey), + partitionState.lastVoterSet().voterIds(), + state.highWatermark(), + randomElectionTimeoutMs(), + logContext + ) + ); + log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch); + } + + /** + * Grant a vote to a candidate as Prospective. We will transition to Prospective with votedKey + * state and remain there until either the election timeout expires or we discover the leader. + */ + public void prospectiveAddVotedState( + int epoch, + ReplicaKey candidateKey + ) { + int currentEpoch = state.epoch(); + if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( String.format( - "Cannot transition to Voted for %s and epoch %d from the current state (%s)", + "Cannot add voted key (%s) to current state (%s) in epoch %d since it matches the local " + + "broker.id", candidateKey, - epoch, - state + state, + epoch + ) + ); + } else if (localId.isEmpty()) { + throw new IllegalStateException("Cannot add voted state without a replica id"); + } else if (epoch != currentEpoch || !isProspectiveNotVoted()) { Review Comment: discussed offline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org