ahuang98 commented on code in PR #18240: URL: https://github.com/apache/kafka/pull/18240#discussion_r1917454524
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java: ########## @@ -573,4 +700,450 @@ public void testRejectPreVoteIfRemoteLogIsNotUpToDate() throws Exception { assertTrue(context.client.quorum().isUnattached()); context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), false); } + + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testPreVoteResponseIgnoredAfterBecomingFollower(KRaftVersion kraftVersion) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey voter2 = replicaKey(localId + 1, true); + ReplicaKey voter3 = replicaKey(localId + 2, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, voter2, voter3)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(KIP_996_PROTOCOL) + .build(); + + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure transition to prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + + // Wait until the vote requests are inflight + context.pollUntilRequest(); + assertTrue(context.client.quorum().isProspective()); + List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + + // While the vote requests are still inflight, replica receives a BeginEpoch for the same epoch + context.deliverRequest(context.beginEpochRequest(epoch, voter3.id())); + context.client.poll(); + context.assertElectedLeader(epoch, voter3.id()); + + // If PreVote responses are received now they should be ignored + VoteResponseData voteResponse1 = context.voteResponse(true, OptionalInt.empty(), epoch); + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + voteResponse1 + ); + + VoteResponseData voteResponse2 = context.voteResponse(true, OptionalInt.of(voter3.id()), epoch); + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + voteResponse2 + ); + + context.client.poll(); + context.assertElectedLeader(epoch, voter3.id()); + } + + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testPreVoteNotSupportedByRemote(KRaftVersion kraftVersion) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey voter2Key = replicaKey(localId + 1, true); + ReplicaKey voter3Key = replicaKey(localId + 2, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, voter2Key, voter3Key)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(KIP_996_PROTOCOL) + .build(); + + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure transition to Prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + assertEquals(epoch, context.currentEpoch()); + assertTrue(context.client.quorum().isProspective()); + + // Simulate one remote node not supporting PreVote with UNSUPPORTED_VERSION response. + // Note: with the mocked network client we simulate this is a bit differently, in reality this response would + // be generated from the network client and not sent from the remote node. + List<RaftRequest.Outbound> voteRequests = context.collectPreVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) + ); + + // Local should transition to Candidate since it realizes remote node does not support PreVote. + context.client.poll(); + assertEquals(epoch + 1, context.currentEpoch()); + context.client.quorum().isCandidate(); + + // Any further PreVote requests should be ignored + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + context.voteResponse(true, OptionalInt.empty(), epoch) + ); + context.client.poll(); + assertEquals(epoch + 1, context.currentEpoch()); + context.client.quorum().isCandidate(); + context.collectVoteRequests(epoch + 1, 0, 0); + + // Sleep to transition back to Prospective + context.time.sleep(context.client.quorum().candidateStateOrThrow().remainingElectionTimeMs(context.time.milliseconds())); + context.client.poll(); + assertEquals(epoch + 1, context.currentEpoch()); + assertTrue(context.client.quorum().isProspective()); + + // Simulate receiving enough valid PreVote responses for election to succeed + context.pollUntilRequest(); + voteRequests = context.collectPreVoteRequests(epoch + 1, 0, 0); + assertEquals(2, voteRequests.size()); + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + context.voteResponse(true, OptionalInt.empty(), epoch + 1) + ); + context.client.poll(); + assertEquals(epoch + 2, context.currentEpoch()); + context.client.quorum().isCandidate(); + + // Any further PreVote responses should be ignored + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) + ); + context.client.poll(); + assertEquals(epoch + 2, context.currentEpoch()); + context.client.quorum().isCandidate(); + } + + @ParameterizedTest + @MethodSource("kraftVersionRaftProtocolCombinations") + public void testProspectiveReceivesBeginQuorumRequest( + KRaftVersion kraftVersion, + RaftProtocol raftProtocol + ) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey leader = replicaKey(localId + 1, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, leader)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(raftProtocol) + .build(); + + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure transition to prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + + assertTrue(context.client.quorum().isProspective()); + + context.deliverRequest(context.beginEpochRequest(epoch, leader.id())); + context.client.poll(); + + assertTrue(context.client.quorum().isFollower()); + context.assertElectedLeader(epoch, leader.id()); + } + + @ParameterizedTest + @MethodSource("kraftVersionRaftProtocolCombinations") + public void testProspectiveTransitionsToUnattachedOnElectionFailure( + KRaftVersion kraftVersion, + RaftProtocol raftProtocol + ) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey otherNode = replicaKey(localId + 1, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, otherNode)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(raftProtocol) + .build(); + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure that transition to prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + assertTrue(context.client.quorum().isProspective()); + context.assertSentVoteRequest(epoch, 0, 0L, 1); + + // If election timeout expires, replica should transition to unattached to attempt re-discovering leader + context.time.sleep(context.electionTimeoutMs() * 2L); + context.client.poll(); + assertTrue(context.client.quorum().isUnattached()); + + // After election times out again, replica will transition back to prospective and send PreVote requests + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + RaftRequest.Outbound voteRequest = context.assertSentVoteRequest(epoch, 0, 0L, 1); + + // If prospective receives enough rejected votes, it also transitions to unattached immediately + context.deliverResponse( + voteRequest.correlationId(), + voteRequest.destination(), + context.voteResponse(false, OptionalInt.empty(), epoch)); + context.client.poll(); + assertTrue(context.client.quorum().isUnattached()); + + // After election times out again, replica will transition back to prospective and send PreVote requests + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + context.assertSentVoteRequest(epoch, 0, 0L, 1); + } + + @ParameterizedTest + @MethodSource("kraftVersionRaftProtocolCombinations") + public void testProspectiveWithLeaderTransitionsToFollower( + KRaftVersion kraftVersion, + RaftProtocol raftProtocol + ) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey replica1 = replicaKey(localId + 1, true); + ReplicaKey replica2 = replicaKey(localId + 2, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, replica1, replica2)), + kraftVersion + ) + .withElectedLeader(epoch, replica1.id()) + .withRaftProtocol(raftProtocol) + .build(); + context.assertElectedLeader(epoch, replica1.id()); + assertTrue(context.client.quorum().isFollower()); + + // Sleep a little to ensure transition to prospective + context.time.sleep(context.fetchTimeoutMs); + context.pollUntilRequest(); + assertTrue(context.client.quorum().isProspective()); + context.assertSentVoteRequest(epoch, 0, 0L, 2); + + // If election timeout expires, replica should transition back to follower if it hasn't found new leader yet + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + context.assertSentFetchRequest(); + assertTrue(context.client.quorum().isFollower()); + context.assertElectedLeader(epoch, replica1.id()); + + // After election times out again, replica will transition back to prospective and send PreVote requests + context.time.sleep(context.fetchTimeoutMs); + context.pollUntilRequest(); + List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + assertTrue(context.client.quorum().isProspective()); + context.assertElectedLeader(epoch, replica1.id()); + + // If prospective receives enough rejected votes without leaderId, it also transitions to follower immediately + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + context.voteResponse(false, OptionalInt.empty(), epoch)); + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + context.voteResponse(false, OptionalInt.empty(), epoch)); + // handle vote response and mark rejected vote + context.client.poll(); + // transition to follower after seeing election has failed + context.pollUntilRequest(); Review Comment: I think I wanted to be more explicit with what happens when - I'll replace `pollUntilRequest()` with the necessary `poll()` calls -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org