jsancio commented on code in PR #17807:
URL: https://github.com/apache/kafka/pull/17807#discussion_r1878851307


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,51 @@ private VoteResponseData handleVoteRequest(
         VoteRequestData.PartitionData partitionRequest =
             request.topics().get(0).partitions().get(0);
 
-        int candidateId = partitionRequest.candidateId();
-        int candidateEpoch = partitionRequest.candidateEpoch();
+        int replicaId = partitionRequest.replicaId();
+        int replicaEpoch = partitionRequest.replicaEpoch();
+        boolean preVote = partitionRequest.preVote();
 
         int lastEpoch = partitionRequest.lastOffsetEpoch();
         long lastEpochEndOffset = partitionRequest.lastOffset();
-        if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= 
candidateEpoch) {
+        boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : 
lastEpoch >= replicaEpoch;
+        if (isIllegalEpoch) {
+            logger.info(
+                "Received a vote request from replica {} with illegal epoch {} 
and last epoch {}",
+                replicaId,
+                replicaEpoch,
+                lastEpoch
+            );
+        }
+        if (lastEpochEndOffset < 0 || lastEpoch < 0 || isIllegalEpoch) {
             return buildVoteResponse(
                 requestMetadata.listenerName(),
                 requestMetadata.apiVersion(),
                 Errors.INVALID_REQUEST,
-                false
+                false,
+                preVote
             );
         }
 
-        Optional<Errors> errorOpt = validateVoterOnlyRequest(candidateId, 
candidateEpoch);
+        Optional<Errors> errorOpt = validateVoterOnlyRequest(replicaId, 
replicaEpoch);
         if (errorOpt.isPresent()) {
             return buildVoteResponse(
                 requestMetadata.listenerName(),
                 requestMetadata.apiVersion(),
                 errorOpt.get(),
-                false
+                false,
+                preVote

Review Comment:
   I am starting to think that we should have removed `preVote` in the response 
schema.



##########
raft/src/main/java/org/apache/kafka/raft/EpochState.java:
##########
@@ -26,16 +26,18 @@ default Optional<LogOffsetMetadata> highWatermark() {
     }
 
     /**
-     * Decide whether to grant a vote to a candidate.
+     * Decide whether to grant a vote to a replica.
      *
      * It is the responsibility of the caller to invoke
      * {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} 
if vote is granted.
      *
-     * @param candidateKey the id and directory of the candidate
-     * @param isLogUpToDate whether the candidate’s log is at least as 
up-to-date as receiver’s log
+     * @param replicaKey the id and directory of the replica requesting the 
vote
+     * @param isLogUpToDate whether the replica's log is at least as 
up-to-date as receiver’s log
      * @return true if it can grant the vote, false otherwise
      */
-    boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate);
+    boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate);
+
+    boolean canGrantPreVote(ReplicaKey replicaKey, boolean isLogUpToDate);

Review Comment:
   Let's add a Java Doc. I am interested to see how this method differs from 
`canGrantVote`.
   
   Having said that, did you consider having one method with this signature: 
`canGrantVote(ReplicaKey, boolean isLogUpdate, boolean isPreVote)`? If yes, why 
did you reject this interface change?



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -44,7 +44,7 @@
  *    Follower: After discovering a leader with an equal or larger epoch
  *
  * Unattached transitions to:
- *    Unattached: After learning of a new election with a higher epoch or 
after voting
+ *    Unattached: After learning of a new election with a higher epoch or 
after giving a binding vote

Review Comment:
   This applies to the "Unattached transitions" and "Voted transitions" section.
   
   Should we merge this two and the wording since in a previous PR we merge 
these two states.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,43 @@ private VoteResponseData handleVoteRequest(
         VoteRequestData.PartitionData partitionRequest =
             request.topics().get(0).partitions().get(0);
 
-        int candidateId = partitionRequest.candidateId();
-        int candidateEpoch = partitionRequest.candidateEpoch();
+        int replicaId = partitionRequest.replicaId();
+        int replicaEpoch = partitionRequest.replicaEpoch();
+        boolean preVote = partitionRequest.preVote();
 
         int lastEpoch = partitionRequest.lastOffsetEpoch();
         long lastEpochEndOffset = partitionRequest.lastOffset();
-        if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= 
candidateEpoch) {
+        boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : 
lastEpoch >= replicaEpoch;

Review Comment:
   Can you write a comment explaining this check/boolean?



##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -46,6 +46,7 @@ public class FollowerState implements EpochState {
     private final Timer updateVoterPeriodTimer;
 
     private final Logger log;
+    private boolean hasFetchedFromLeader;

Review Comment:
   Maybe move this right below `fetchTimer`. Should be useful to document this 
field a bit.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -641,8 +641,11 @@ private int randomElectionTimeoutMs() {
         return electionTimeoutMs + random.nextInt(electionTimeoutMs);
     }
 
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
-        return state.canGrantVote(candidateKey, isLogUpToDate);
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
+        if (isPreVote) {
+            return state.canGrantPreVote(replicaKey, isLogUpToDate);
+        }
+        return state.canGrantVote(replicaKey, isLogUpToDate);

Review Comment:
   Minor and feel free to ignore but is this more readable?
   ```java
       public boolean canGrantVote(ReplicaKey replicaKey, boolean 
isLogUpToDate, boolean isPreVote) {
           return isPreVote ?
               state.canGrantPreVote(replicaKey, isLogUpToDate) :
               state.canGrantVote(replicaKey, isLogUpToDate);
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -822,30 +836,36 @@ private VoteResponseData handleVoteRequest(
                 requestMetadata.listenerName(),
                 requestMetadata.apiVersion(),
                 Errors.INVALID_VOTER_KEY,
-                false
+                false,
+                preVote
             );
         }
 
         OffsetAndEpoch lastEpochEndOffsetAndEpoch = new 
OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
-        ReplicaKey candidateKey = ReplicaKey.of(
-            candidateId,
-            partitionRequest.candidateDirectoryId()
+        ReplicaKey replicaKey = ReplicaKey.of(
+            replicaId,
+            partitionRequest.replicaDirectoryId()
         );
         boolean voteGranted = quorum.canGrantVote(
-            candidateKey,
-            lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0
+            replicaKey,
+            lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0,
+            preVote
         );
 
-        if (voteGranted && quorum.isUnattachedNotVoted()) {
-            transitionToUnattachedVoted(candidateKey, candidateEpoch);
+        if (!preVote && voteGranted && quorum.isUnattachedNotVoted()) {
+            transitionToUnattachedVoted(replicaKey, replicaEpoch);
         }
 
-        logger.info("Vote request {} with epoch {} is {}", request, 
candidateEpoch, voteGranted ? "granted" : "rejected");
+        logger.info("Vote request {} with epoch {} is {}",
+            request,
+            replicaEpoch,
+            voteGranted ? "granted" : "rejected");

Review Comment:
   How about this formatting:
   ```java
           logger.info(
               "Vote request {} with epoch {} is {}",
               request,
               replicaEpoch,
               voteGranted ? "granted" : "rejected"
           );
   ```



##########
raft/src/main/java/org/apache/kafka/raft/ResignedState.java:
##########
@@ -140,10 +140,21 @@ public List<ReplicaKey> preferredSuccessors() {
     }
 
     @Override
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate) {
         log.debug(
-            "Rejecting vote request from candidate ({}) since we have resigned 
as candidate/leader in epoch {}",
-            candidateKey,
+            "Rejecting Vote request from candidate ({}) since we have resigned 
as leader in epoch {}",
+            replicaKey,
+            epoch
+        );
+
+        return false;
+    }
+
+    @Override
+    public boolean canGrantPreVote(ReplicaKey replicaKey, boolean 
isLogUpToDate) {

Review Comment:
   Did we agree that canGrantPreVote is true in the resigned state if the log 
is up to date?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,51 @@ private VoteResponseData handleVoteRequest(
         VoteRequestData.PartitionData partitionRequest =
             request.topics().get(0).partitions().get(0);
 
-        int candidateId = partitionRequest.candidateId();
-        int candidateEpoch = partitionRequest.candidateEpoch();
+        int replicaId = partitionRequest.replicaId();
+        int replicaEpoch = partitionRequest.replicaEpoch();
+        boolean preVote = partitionRequest.preVote();
 
         int lastEpoch = partitionRequest.lastOffsetEpoch();
         long lastEpochEndOffset = partitionRequest.lastOffset();
-        if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= 
candidateEpoch) {
+        boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : 
lastEpoch >= replicaEpoch;
+        if (isIllegalEpoch) {
+            logger.info(
+                "Received a vote request from replica {} with illegal epoch {} 
and last epoch {}",
+                replicaId,
+                replicaEpoch,
+                lastEpoch

Review Comment:
   We should also log the value of `preVote`. This is important to determine if 
it `isIllegalEpoch`.



##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -202,16 +205,34 @@ public void 
setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) {
     }
 
     @Override
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate) {
         log.debug(
             "Rejecting vote request from candidate ({}) since we already have 
a leader {} in epoch {}",
-            candidateKey,
+            replicaKey,
             leaderId,
             epoch
         );
         return false;
     }
 
+    @Override
+    public boolean canGrantPreVote(ReplicaKey replicaKey, boolean 
isLogUpToDate) {
+        if (hasFetchedFromLeader) {
+            log.debug(
+                "Rejecting PreVote request from replica ({}) since we already 
have a leader {} in epoch {}",
+                replicaKey,
+                leaderId,
+                epoch
+            );
+            return false;
+        } else if (!isLogUpToDate) {
+            log.debug(
+                "Rejecting PreVote request from replica ({}) since replica 
epoch/offset is not up to date with us",
+                replicaKey);
+        }
+        return isLogUpToDate;
+    }

Review Comment:
   This method is a lot of lines because we are trying to generate a very 
specific log message. I don't think we need that since debug message are for 
developers. How about:
   ```java
           boolean granting = !hasFetchedFromLeader && isLogUpToDate;
           if (!granting) {
               log.debug(
                   "Rejecting PreVote request from replica ({}) since leader 
{}, epoch is {}, isLogUpToDate is {} and hasFetched is {}",
                   replicaKey,
                   leaderId,
                   epoch,
                   isLogUpToDate,
                   hasFetchedFromLeader
               );
           }
   
           return granting;
       }
   ```



##########
raft/src/main/java/org/apache/kafka/raft/RaftUtil.java:
##########
@@ -190,19 +192,24 @@ public static VoteResponseData singletonVoteResponse(
         int leaderEpoch,
         int leaderId,
         boolean voteGranted,
+        boolean preVote,
         Endpoints endpoints
     ) {
+        VoteResponseData.PartitionData partitionData = new 
VoteResponseData.PartitionData()
+            .setErrorCode(partitionLevelError.code())
+            .setLeaderId(leaderId)
+            .setLeaderEpoch(leaderEpoch)
+            .setVoteGranted(voteGranted);
+        if (apiVersion >= 2) {
+            partitionData.setPreVote(preVote);
+        }

Review Comment:
   I am pretty convinced that we should remove the preVote field from the 
response. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to