jsancio commented on code in PR #17807: URL: https://github.com/apache/kafka/pull/17807#discussion_r1878851307
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -779,39 +781,51 @@ private VoteResponseData handleVoteRequest( VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); - int candidateId = partitionRequest.candidateId(); - int candidateEpoch = partitionRequest.candidateEpoch(); + int replicaId = partitionRequest.replicaId(); + int replicaEpoch = partitionRequest.replicaEpoch(); + boolean preVote = partitionRequest.preVote(); int lastEpoch = partitionRequest.lastOffsetEpoch(); long lastEpochEndOffset = partitionRequest.lastOffset(); - if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= candidateEpoch) { + boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : lastEpoch >= replicaEpoch; + if (isIllegalEpoch) { + logger.info( + "Received a vote request from replica {} with illegal epoch {} and last epoch {}", + replicaId, + replicaEpoch, + lastEpoch + ); + } + if (lastEpochEndOffset < 0 || lastEpoch < 0 || isIllegalEpoch) { return buildVoteResponse( requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.INVALID_REQUEST, - false + false, + preVote ); } - Optional<Errors> errorOpt = validateVoterOnlyRequest(candidateId, candidateEpoch); + Optional<Errors> errorOpt = validateVoterOnlyRequest(replicaId, replicaEpoch); if (errorOpt.isPresent()) { return buildVoteResponse( requestMetadata.listenerName(), requestMetadata.apiVersion(), errorOpt.get(), - false + false, + preVote Review Comment: I am starting to think that we should have removed `preVote` in the response schema. ########## raft/src/main/java/org/apache/kafka/raft/EpochState.java: ########## @@ -26,16 +26,18 @@ default Optional<LogOffsetMetadata> highWatermark() { } /** - * Decide whether to grant a vote to a candidate. + * Decide whether to grant a vote to a replica. * * It is the responsibility of the caller to invoke * {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} if vote is granted. * - * @param candidateKey the id and directory of the candidate - * @param isLogUpToDate whether the candidate’s log is at least as up-to-date as receiver’s log + * @param replicaKey the id and directory of the replica requesting the vote + * @param isLogUpToDate whether the replica's log is at least as up-to-date as receiver’s log * @return true if it can grant the vote, false otherwise */ - boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate); + boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate); + + boolean canGrantPreVote(ReplicaKey replicaKey, boolean isLogUpToDate); Review Comment: Let's add a Java Doc. I am interested to see how this method differs from `canGrantVote`. Having said that, did you consider having one method with this signature: `canGrantVote(ReplicaKey, boolean isLogUpdate, boolean isPreVote)`? If yes, why did you reject this interface change? ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -44,7 +44,7 @@ * Follower: After discovering a leader with an equal or larger epoch * * Unattached transitions to: - * Unattached: After learning of a new election with a higher epoch or after voting + * Unattached: After learning of a new election with a higher epoch or after giving a binding vote Review Comment: This applies to the "Unattached transitions" and "Voted transitions" section. Should we merge this two and the wording since in a previous PR we merge these two states. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -779,39 +781,43 @@ private VoteResponseData handleVoteRequest( VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); - int candidateId = partitionRequest.candidateId(); - int candidateEpoch = partitionRequest.candidateEpoch(); + int replicaId = partitionRequest.replicaId(); + int replicaEpoch = partitionRequest.replicaEpoch(); + boolean preVote = partitionRequest.preVote(); int lastEpoch = partitionRequest.lastOffsetEpoch(); long lastEpochEndOffset = partitionRequest.lastOffset(); - if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= candidateEpoch) { + boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : lastEpoch >= replicaEpoch; Review Comment: Can you write a comment explaining this check/boolean? ########## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ########## @@ -46,6 +46,7 @@ public class FollowerState implements EpochState { private final Timer updateVoterPeriodTimer; private final Logger log; + private boolean hasFetchedFromLeader; Review Comment: Maybe move this right below `fetchTimer`. Should be useful to document this field a bit. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -641,8 +641,11 @@ private int randomElectionTimeoutMs() { return electionTimeoutMs + random.nextInt(electionTimeoutMs); } - public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) { - return state.canGrantVote(candidateKey, isLogUpToDate); + public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) { + if (isPreVote) { + return state.canGrantPreVote(replicaKey, isLogUpToDate); + } + return state.canGrantVote(replicaKey, isLogUpToDate); Review Comment: Minor and feel free to ignore but is this more readable? ```java public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) { return isPreVote ? state.canGrantPreVote(replicaKey, isLogUpToDate) : state.canGrantVote(replicaKey, isLogUpToDate); ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -822,30 +836,36 @@ private VoteResponseData handleVoteRequest( requestMetadata.listenerName(), requestMetadata.apiVersion(), Errors.INVALID_VOTER_KEY, - false + false, + preVote ); } OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch); - ReplicaKey candidateKey = ReplicaKey.of( - candidateId, - partitionRequest.candidateDirectoryId() + ReplicaKey replicaKey = ReplicaKey.of( + replicaId, + partitionRequest.replicaDirectoryId() ); boolean voteGranted = quorum.canGrantVote( - candidateKey, - lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0 + replicaKey, + lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0, + preVote ); - if (voteGranted && quorum.isUnattachedNotVoted()) { - transitionToUnattachedVoted(candidateKey, candidateEpoch); + if (!preVote && voteGranted && quorum.isUnattachedNotVoted()) { + transitionToUnattachedVoted(replicaKey, replicaEpoch); } - logger.info("Vote request {} with epoch {} is {}", request, candidateEpoch, voteGranted ? "granted" : "rejected"); + logger.info("Vote request {} with epoch {} is {}", + request, + replicaEpoch, + voteGranted ? "granted" : "rejected"); Review Comment: How about this formatting: ```java logger.info( "Vote request {} with epoch {} is {}", request, replicaEpoch, voteGranted ? "granted" : "rejected" ); ``` ########## raft/src/main/java/org/apache/kafka/raft/ResignedState.java: ########## @@ -140,10 +140,21 @@ public List<ReplicaKey> preferredSuccessors() { } @Override - public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) { + public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate) { log.debug( - "Rejecting vote request from candidate ({}) since we have resigned as candidate/leader in epoch {}", - candidateKey, + "Rejecting Vote request from candidate ({}) since we have resigned as leader in epoch {}", + replicaKey, + epoch + ); + + return false; + } + + @Override + public boolean canGrantPreVote(ReplicaKey replicaKey, boolean isLogUpToDate) { Review Comment: Did we agree that canGrantPreVote is true in the resigned state if the log is up to date? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -779,39 +781,51 @@ private VoteResponseData handleVoteRequest( VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); - int candidateId = partitionRequest.candidateId(); - int candidateEpoch = partitionRequest.candidateEpoch(); + int replicaId = partitionRequest.replicaId(); + int replicaEpoch = partitionRequest.replicaEpoch(); + boolean preVote = partitionRequest.preVote(); int lastEpoch = partitionRequest.lastOffsetEpoch(); long lastEpochEndOffset = partitionRequest.lastOffset(); - if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= candidateEpoch) { + boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : lastEpoch >= replicaEpoch; + if (isIllegalEpoch) { + logger.info( + "Received a vote request from replica {} with illegal epoch {} and last epoch {}", + replicaId, + replicaEpoch, + lastEpoch Review Comment: We should also log the value of `preVote`. This is important to determine if it `isIllegalEpoch`. ########## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ########## @@ -202,16 +205,34 @@ public void setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) { } @Override - public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) { + public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate) { log.debug( "Rejecting vote request from candidate ({}) since we already have a leader {} in epoch {}", - candidateKey, + replicaKey, leaderId, epoch ); return false; } + @Override + public boolean canGrantPreVote(ReplicaKey replicaKey, boolean isLogUpToDate) { + if (hasFetchedFromLeader) { + log.debug( + "Rejecting PreVote request from replica ({}) since we already have a leader {} in epoch {}", + replicaKey, + leaderId, + epoch + ); + return false; + } else if (!isLogUpToDate) { + log.debug( + "Rejecting PreVote request from replica ({}) since replica epoch/offset is not up to date with us", + replicaKey); + } + return isLogUpToDate; + } Review Comment: This method is a lot of lines because we are trying to generate a very specific log message. I don't think we need that since debug message are for developers. How about: ```java boolean granting = !hasFetchedFromLeader && isLogUpToDate; if (!granting) { log.debug( "Rejecting PreVote request from replica ({}) since leader {}, epoch is {}, isLogUpToDate is {} and hasFetched is {}", replicaKey, leaderId, epoch, isLogUpToDate, hasFetchedFromLeader ); } return granting; } ``` ########## raft/src/main/java/org/apache/kafka/raft/RaftUtil.java: ########## @@ -190,19 +192,24 @@ public static VoteResponseData singletonVoteResponse( int leaderEpoch, int leaderId, boolean voteGranted, + boolean preVote, Endpoints endpoints ) { + VoteResponseData.PartitionData partitionData = new VoteResponseData.PartitionData() + .setErrorCode(partitionLevelError.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + .setVoteGranted(voteGranted); + if (apiVersion >= 2) { + partitionData.setPreVote(preVote); + } Review Comment: I am pretty convinced that we should remove the preVote field from the response. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org