ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1917454524


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -573,4 +700,450 @@ public void testRejectPreVoteIfRemoteLogIsNotUpToDate() 
throws Exception {
         assertTrue(context.client.quorum().isUnattached());
         context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), false);
     }
+
+    @ParameterizedTest
+    @EnumSource(value = KRaftVersion.class)
+    public void testPreVoteResponseIgnoredAfterBecomingFollower(KRaftVersion 
kraftVersion) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey voter2 = replicaKey(localId + 1, true);
+        ReplicaKey voter3 = replicaKey(localId + 2, true);
+        int epoch = 5;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local,
+            VoterSetTest.voterSet(Stream.of(local, voter2, voter3)),
+            kraftVersion
+        )
+            .withUnknownLeader(epoch)
+            .withRaftProtocol(KIP_996_PROTOCOL)
+            .build();
+
+        context.assertUnknownLeaderAndNoVotedCandidate(epoch);
+
+        // Sleep a little to ensure transition to prospective
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+
+        // Wait until the vote requests are inflight
+        context.pollUntilRequest();
+        assertTrue(context.client.quorum().isProspective());
+        List<RaftRequest.Outbound> voteRequests = 
context.collectVoteRequests(epoch, 0, 0);
+        assertEquals(2, voteRequests.size());
+
+        // While the vote requests are still inflight, replica receives a 
BeginEpoch for the same epoch
+        context.deliverRequest(context.beginEpochRequest(epoch, voter3.id()));
+        context.client.poll();
+        context.assertElectedLeader(epoch, voter3.id());
+
+        // If PreVote responses are received now they should be ignored
+        VoteResponseData voteResponse1 = context.voteResponse(true, 
OptionalInt.empty(), epoch);
+        context.deliverResponse(
+            voteRequests.get(0).correlationId(),
+            voteRequests.get(0).destination(),
+            voteResponse1
+        );
+
+        VoteResponseData voteResponse2 = context.voteResponse(true, 
OptionalInt.of(voter3.id()), epoch);
+        context.deliverResponse(
+            voteRequests.get(1).correlationId(),
+            voteRequests.get(1).destination(),
+            voteResponse2
+        );
+
+        context.client.poll();
+        context.assertElectedLeader(epoch, voter3.id());
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = KRaftVersion.class)
+    public void testPreVoteNotSupportedByRemote(KRaftVersion kraftVersion) 
throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey voter2Key = replicaKey(localId + 1, true);
+        ReplicaKey voter3Key = replicaKey(localId + 2, true);
+        int epoch = 5;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local,
+            VoterSetTest.voterSet(Stream.of(local, voter2Key, voter3Key)),
+            kraftVersion
+        )
+            .withUnknownLeader(epoch)
+            .withRaftProtocol(KIP_996_PROTOCOL)
+            .build();
+
+        context.assertUnknownLeaderAndNoVotedCandidate(epoch);
+
+        // Sleep a little to ensure transition to Prospective
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.pollUntilRequest();
+        assertEquals(epoch, context.currentEpoch());
+        assertTrue(context.client.quorum().isProspective());
+
+        // Simulate one remote node not supporting PreVote with 
UNSUPPORTED_VERSION response.
+        // Note: with the mocked network client we simulate this is a bit 
differently, in reality this response would
+        // be generated from the network client and not sent from the remote 
node.
+        List<RaftRequest.Outbound> voteRequests = 
context.collectPreVoteRequests(epoch, 0, 0);
+        assertEquals(2, voteRequests.size());
+        context.deliverResponse(
+            voteRequests.get(0).correlationId(),
+            voteRequests.get(0).destination(),
+            RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION)
+        );
+
+        // Local should transition to Candidate since it realizes remote node 
does not support PreVote.
+        context.client.poll();
+        assertEquals(epoch + 1, context.currentEpoch());
+        context.client.quorum().isCandidate();
+
+        // Any further PreVote requests should be ignored
+        context.deliverResponse(
+            voteRequests.get(1).correlationId(),
+            voteRequests.get(1).destination(),
+            context.voteResponse(true, OptionalInt.empty(), epoch)
+        );
+        context.client.poll();
+        assertEquals(epoch + 1, context.currentEpoch());
+        context.client.quorum().isCandidate();
+        context.collectVoteRequests(epoch + 1, 0, 0);
+
+        // Sleep to transition back to Prospective
+        
context.time.sleep(context.client.quorum().candidateStateOrThrow().remainingElectionTimeMs(context.time.milliseconds()));
+        context.client.poll();
+        assertEquals(epoch + 1, context.currentEpoch());
+        assertTrue(context.client.quorum().isProspective());
+
+        // Simulate receiving enough valid PreVote responses for election to 
succeed
+        context.pollUntilRequest();
+        voteRequests = context.collectPreVoteRequests(epoch + 1, 0, 0);
+        assertEquals(2, voteRequests.size());
+        context.deliverResponse(
+            voteRequests.get(0).correlationId(),
+            voteRequests.get(0).destination(),
+            context.voteResponse(true, OptionalInt.empty(), epoch + 1)
+        );
+        context.client.poll();
+        assertEquals(epoch + 2, context.currentEpoch());
+        context.client.quorum().isCandidate();
+
+        // Any further PreVote responses should be ignored
+        context.deliverResponse(
+            voteRequests.get(1).correlationId(),
+            voteRequests.get(1).destination(),
+            RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION)
+        );
+        context.client.poll();
+        assertEquals(epoch + 2, context.currentEpoch());
+        context.client.quorum().isCandidate();
+    }
+
+    @ParameterizedTest
+    @MethodSource("kraftVersionRaftProtocolCombinations")
+    public void testProspectiveReceivesBeginQuorumRequest(
+        KRaftVersion kraftVersion,
+        RaftProtocol raftProtocol
+    ) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey leader = replicaKey(localId + 1, true);
+        int epoch = 5;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local,
+            VoterSetTest.voterSet(Stream.of(local, leader)),
+            kraftVersion
+        )
+            .withUnknownLeader(epoch)
+            .withRaftProtocol(raftProtocol)
+            .build();
+
+        context.assertUnknownLeaderAndNoVotedCandidate(epoch);
+
+        // Sleep a little to ensure transition to prospective
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.pollUntilRequest();
+
+        assertTrue(context.client.quorum().isProspective());
+
+        context.deliverRequest(context.beginEpochRequest(epoch, leader.id()));
+        context.client.poll();
+
+        assertTrue(context.client.quorum().isFollower());
+        context.assertElectedLeader(epoch, leader.id());
+    }
+
+    @ParameterizedTest
+    @MethodSource("kraftVersionRaftProtocolCombinations")
+    public void testProspectiveTransitionsToUnattachedOnElectionFailure(
+        KRaftVersion kraftVersion,
+        RaftProtocol raftProtocol
+    ) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey otherNode = replicaKey(localId + 1, true);
+        int epoch = 5;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local,
+            VoterSetTest.voterSet(Stream.of(local, otherNode)),
+            kraftVersion
+        )
+            .withUnknownLeader(epoch)
+            .withRaftProtocol(raftProtocol)
+            .build();
+        context.assertUnknownLeaderAndNoVotedCandidate(epoch);
+
+        // Sleep a little to ensure that transition to prospective
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.pollUntilRequest();
+        assertTrue(context.client.quorum().isProspective());
+        context.assertSentVoteRequest(epoch, 0, 0L, 1);
+
+        // If election timeout expires, replica should transition to 
unattached to attempt re-discovering leader
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.client.poll();
+        assertTrue(context.client.quorum().isUnattached());
+
+        // After election times out again, replica will transition back to 
prospective and send PreVote requests
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.pollUntilRequest();
+        RaftRequest.Outbound voteRequest = 
context.assertSentVoteRequest(epoch, 0, 0L, 1);
+
+        // If prospective receives enough rejected votes, it also transitions 
to unattached immediately
+        context.deliverResponse(
+            voteRequest.correlationId(),
+            voteRequest.destination(),
+            context.voteResponse(false, OptionalInt.empty(), epoch));
+        context.client.poll();
+        assertTrue(context.client.quorum().isUnattached());
+
+        // After election times out again, replica will transition back to 
prospective and send PreVote requests
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.pollUntilRequest();
+        context.assertSentVoteRequest(epoch, 0, 0L, 1);
+    }
+
+    @ParameterizedTest
+    @MethodSource("kraftVersionRaftProtocolCombinations")
+    public void testProspectiveWithLeaderTransitionsToFollower(
+        KRaftVersion kraftVersion,
+        RaftProtocol raftProtocol
+    ) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        int epoch = 5;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local,
+            VoterSetTest.voterSet(Stream.of(local, replica1, replica2)),
+            kraftVersion
+        )
+            .withElectedLeader(epoch, replica1.id())
+            .withRaftProtocol(raftProtocol)
+            .build();
+        context.assertElectedLeader(epoch, replica1.id());
+        assertTrue(context.client.quorum().isFollower());
+
+        // Sleep a little to ensure transition to prospective
+        context.time.sleep(context.fetchTimeoutMs);
+        context.pollUntilRequest();
+        assertTrue(context.client.quorum().isProspective());
+        context.assertSentVoteRequest(epoch, 0, 0L, 2);
+
+        // If election timeout expires, replica should transition back to 
follower if it hasn't found new leader yet
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.pollUntilRequest();
+        context.assertSentFetchRequest();
+        assertTrue(context.client.quorum().isFollower());
+        context.assertElectedLeader(epoch, replica1.id());
+
+        // After election times out again, replica will transition back to 
prospective and send PreVote requests
+        context.time.sleep(context.fetchTimeoutMs);
+        context.pollUntilRequest();
+        List<RaftRequest.Outbound> voteRequests = 
context.collectVoteRequests(epoch, 0, 0);
+        assertEquals(2, voteRequests.size());
+        assertTrue(context.client.quorum().isProspective());
+        context.assertElectedLeader(epoch, replica1.id());
+
+        // If prospective receives enough rejected votes without leaderId, it 
also transitions to follower immediately
+        context.deliverResponse(
+            voteRequests.get(0).correlationId(),
+            voteRequests.get(0).destination(),
+            context.voteResponse(false, OptionalInt.empty(), epoch));
+        context.deliverResponse(
+            voteRequests.get(1).correlationId(),
+            voteRequests.get(1).destination(),
+            context.voteResponse(false, OptionalInt.empty(), epoch));
+        // handle vote response and mark rejected vote
+        context.client.poll();
+        // transition to follower after seeing election has failed
+        context.pollUntilRequest();

Review Comment:
   I think I wanted to be more explicit with what happens when - I'll replace 
`pollUntilRequest()` with the necessary `poll()` calls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to