ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1917419710


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -86,186 +99,249 @@ public void 
testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean hasFetch
         assertTrue(context.client.quorum().isUnattachedNotVoted());
     }
 
-    @Test
-    public void testHandlePreVoteRequestAsCandidate() throws Exception {
+    @ParameterizedTest
+    @EnumSource(value = KRaftVersion.class)
+    public void 
testHandlePreVoteRequestAsFollowerWithVotedCandidate(KRaftVersion kraftVersion) 
throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey localKey = replicaKey(localId, true);
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        ReplicaKey votedCandidateKey = replicaKey(localId + 2, true);
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(localKey, 
otherNodeKey, votedCandidateKey));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localKey, voters, kraftVersion)
+            .withVotedCandidate(epoch, votedCandidateKey)
+            .withRaftProtocol(KIP_996_PROTOCOL)
+            .build();
+        context.pollUntilRequest();
+        context.assertSentFetchRequest();
+        context.deliverRequest(context.beginEpochRequest(epoch, 
votedCandidateKey.id(), voters.listeners(votedCandidateKey.id())));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        assertTrue(context.client.quorum().isFollower());
+
+        // follower can grant PreVotes if it has not fetched successfully from 
leader yet
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(votedCandidateKey.id()), true);
+
+        // after fetching from leader, follower should reject PreVote requests
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.deliverResponse(
+            fetchRequest.correlationId(),
+            fetchRequest.destination(),
+            context.fetchResponse(epoch, votedCandidateKey.id(), 
MemoryRecords.EMPTY, 0L, Errors.NONE)
+        );
+
+        context.client.poll();
+        assertTrue(context.client.quorum().isFollower());
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(votedCandidateKey.id()), false);
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = KRaftVersion.class)
+    public void testHandlePreVoteRequestAsCandidate(KRaftVersion kraftVersion) 
throws Exception {
         int localId = randomReplicaId();
+        ReplicaKey localKey = replicaKey(localId, true);
         ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
         ReplicaKey observer = replicaKey(localId + 2, true);
-        int leaderEpoch = 2;
-        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+        int epoch = 2;
 
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
-            .withVotedCandidate(leaderEpoch, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
-            .withKip853Rpc(true)
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey,
+            VoterSetTest.voterSet(Stream.of(localKey, otherNodeKey)),
+            kraftVersion
+        )
+            .withVotedCandidate(epoch, ReplicaKey.of(localId, 
localKey.directoryId().get()))
+            .withRaftProtocol(KIP_996_PROTOCOL)
             .build();
         assertTrue(context.client.quorum().isCandidate());
 
         // candidate should grant pre-vote requests with the same epoch if log 
is up-to-date
-        context.deliverRequest(context.preVoteRequest(leaderEpoch, 
otherNodeKey, leaderEpoch, 1));
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
         context.pollUntilResponse();
 
-        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.empty(), true);
-        context.assertVotedCandidate(leaderEpoch, localId);
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(epoch, localKey);
         assertTrue(context.client.quorum().isCandidate());
 
-        // if an observer sends a pre-vote request for the same epoch, it 
should also be granted
-        context.deliverRequest(context.preVoteRequest(leaderEpoch, observer, 
leaderEpoch, 1));
+        // if an observer with up-to-date log sends a pre-vote request for the 
same epoch, it should also be granted
+        context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 
2));
         context.pollUntilResponse();
 
-        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.empty(), true);
-        context.assertVotedCandidate(leaderEpoch, localId);
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(epoch, localKey);
         assertTrue(context.client.quorum().isCandidate());
 
         // candidate will transition to unattached if pre-vote request has a 
higher epoch
-        context.deliverRequest(context.preVoteRequest(leaderEpoch + 1, 
otherNodeKey, leaderEpoch + 1, 1));
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 2));
         context.pollUntilResponse();
 
-        context.assertSentVoteResponse(Errors.NONE, leaderEpoch + 1, 
OptionalInt.of(-1), true);
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
         assertTrue(context.client.quorum().isUnattached());
     }
 
-    @Test
-    public void testHandlePreVoteRequestAsUnattachedObserver() throws 
Exception {
+    @ParameterizedTest
+    @EnumSource(value = KRaftVersion.class)
+    public void testHandlePreVoteRequestAsUnattachedObserver(KRaftVersion 
kraftVersion) throws Exception {
         int localId = randomReplicaId();
         int epoch = 2;
+        ReplicaKey localKey = replicaKey(localId, true);
         ReplicaKey replica1 = replicaKey(localId + 1, true);
         ReplicaKey replica2 = replicaKey(localId + 2, true);
         ReplicaKey observer = replicaKey(localId + 3, true);
-        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
 
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey,
+            VoterSetTest.voterSet(Stream.of(replica1, replica2)),
+            kraftVersion
+        )
             .withUnknownLeader(epoch)
-            .withKip853Rpc(true)
+            .withRaftProtocol(KIP_996_PROTOCOL)
             .build();
+        assertTrue(context.client.quorum().isUnattached());
+        assertTrue(context.client.quorum().isObserver());
 
+        // if a voter with up-to-date log sends a pre-vote request, it should 
be granted
         context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
         context.pollUntilResponse();
-
-        assertTrue(context.client.quorum().isUnattached());
         context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
 
-        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        // if same voter sends another pre-vote request, it can be granted if 
the sender's log is still up-to-date
         context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
         context.pollUntilResponse();
-
-        assertTrue(context.client.quorum().isUnattached());
         context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
 
-        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        // if different voter with up-to-date log sends a pre-vote request for 
the same epoch, it will be granted
         context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
         context.pollUntilResponse();
-
-        assertTrue(context.client.quorum().isUnattached());
         context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
 
-        // if an observer sends a pre-vote request for the same epoch, it 
should also be granted
-        context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 
1));
+        // if an observer with up-to-date log sends a pre-vote request for the 
same epoch, it will be granted
+        context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 
2));

Review Comment:
   hm, can't think of a reason. I'll standardize



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to