ahuang98 commented on code in PR #18240: URL: https://github.com/apache/kafka/pull/18240#discussion_r1917419710
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java: ########## @@ -86,186 +99,249 @@ public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean hasFetch assertTrue(context.client.quorum().isUnattachedNotVoted()); } - @Test - public void testHandlePreVoteRequestAsCandidate() throws Exception { + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testHandlePreVoteRequestAsFollowerWithVotedCandidate(KRaftVersion kraftVersion) throws Exception { + int localId = randomReplicaId(); + int epoch = 2; + ReplicaKey localKey = replicaKey(localId, true); + ReplicaKey otherNodeKey = replicaKey(localId + 1, true); + ReplicaKey votedCandidateKey = replicaKey(localId + 2, true); + VoterSet voters = VoterSetTest.voterSet(Stream.of(localKey, otherNodeKey, votedCandidateKey)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localKey, voters, kraftVersion) + .withVotedCandidate(epoch, votedCandidateKey) + .withRaftProtocol(KIP_996_PROTOCOL) + .build(); + context.pollUntilRequest(); + context.assertSentFetchRequest(); + context.deliverRequest(context.beginEpochRequest(epoch, votedCandidateKey.id(), voters.listeners(votedCandidateKey.id()))); + context.pollUntilResponse(); + context.assertSentBeginQuorumEpochResponse(Errors.NONE); + assertTrue(context.client.quorum().isFollower()); + + // follower can grant PreVotes if it has not fetched successfully from leader yet + context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, epoch, 1)); + context.pollUntilResponse(); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(votedCandidateKey.id()), true); + + // after fetching from leader, follower should reject PreVote requests + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); + context.deliverResponse( + fetchRequest.correlationId(), + fetchRequest.destination(), + context.fetchResponse(epoch, votedCandidateKey.id(), MemoryRecords.EMPTY, 0L, Errors.NONE) + ); + + context.client.poll(); + assertTrue(context.client.quorum().isFollower()); + context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, epoch, 1)); + context.pollUntilResponse(); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(votedCandidateKey.id()), false); + } + + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testHandlePreVoteRequestAsCandidate(KRaftVersion kraftVersion) throws Exception { int localId = randomReplicaId(); + ReplicaKey localKey = replicaKey(localId, true); ReplicaKey otherNodeKey = replicaKey(localId + 1, true); ReplicaKey observer = replicaKey(localId + 2, true); - int leaderEpoch = 2; - Set<Integer> voters = Set.of(localId, otherNodeKey.id()); + int epoch = 2; - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withVotedCandidate(leaderEpoch, ReplicaKey.of(localId, ReplicaKey.NO_DIRECTORY_ID)) - .withKip853Rpc(true) + RaftClientTestContext context = new RaftClientTestContext.Builder( + localKey, + VoterSetTest.voterSet(Stream.of(localKey, otherNodeKey)), + kraftVersion + ) + .withVotedCandidate(epoch, ReplicaKey.of(localId, localKey.directoryId().get())) + .withRaftProtocol(KIP_996_PROTOCOL) .build(); assertTrue(context.client.quorum().isCandidate()); // candidate should grant pre-vote requests with the same epoch if log is up-to-date - context.deliverRequest(context.preVoteRequest(leaderEpoch, otherNodeKey, leaderEpoch, 1)); + context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, epoch, 1)); context.pollUntilResponse(); - context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.empty(), true); - context.assertVotedCandidate(leaderEpoch, localId); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), true); + context.assertVotedCandidate(epoch, localKey); assertTrue(context.client.quorum().isCandidate()); - // if an observer sends a pre-vote request for the same epoch, it should also be granted - context.deliverRequest(context.preVoteRequest(leaderEpoch, observer, leaderEpoch, 1)); + // if an observer with up-to-date log sends a pre-vote request for the same epoch, it should also be granted + context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 2)); context.pollUntilResponse(); - context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.empty(), true); - context.assertVotedCandidate(leaderEpoch, localId); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), true); + context.assertVotedCandidate(epoch, localKey); assertTrue(context.client.quorum().isCandidate()); // candidate will transition to unattached if pre-vote request has a higher epoch - context.deliverRequest(context.preVoteRequest(leaderEpoch + 1, otherNodeKey, leaderEpoch + 1, 1)); + context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, epoch + 1, 2)); context.pollUntilResponse(); - context.assertSentVoteResponse(Errors.NONE, leaderEpoch + 1, OptionalInt.of(-1), true); + context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.of(-1), true); assertTrue(context.client.quorum().isUnattached()); } - @Test - public void testHandlePreVoteRequestAsUnattachedObserver() throws Exception { + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testHandlePreVoteRequestAsUnattachedObserver(KRaftVersion kraftVersion) throws Exception { int localId = randomReplicaId(); int epoch = 2; + ReplicaKey localKey = replicaKey(localId, true); ReplicaKey replica1 = replicaKey(localId + 1, true); ReplicaKey replica2 = replicaKey(localId + 2, true); ReplicaKey observer = replicaKey(localId + 3, true); - Set<Integer> voters = Set.of(replica1.id(), replica2.id()); - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + RaftClientTestContext context = new RaftClientTestContext.Builder( + localKey, + VoterSetTest.voterSet(Stream.of(replica1, replica2)), + kraftVersion + ) .withUnknownLeader(epoch) - .withKip853Rpc(true) + .withRaftProtocol(KIP_996_PROTOCOL) .build(); + assertTrue(context.client.quorum().isUnattached()); + assertTrue(context.client.quorum().isObserver()); + // if a voter with up-to-date log sends a pre-vote request, it should be granted context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 1)); context.pollUntilResponse(); - - assertTrue(context.client.quorum().isUnattached()); context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), true); - // if same replica sends another pre-vote request for the same epoch, it should be granted + // if same voter sends another pre-vote request, it can be granted if the sender's log is still up-to-date context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 1)); context.pollUntilResponse(); - - assertTrue(context.client.quorum().isUnattached()); context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), true); - // if different replica sends a pre-vote request for the same epoch, it should be granted + // if different voter with up-to-date log sends a pre-vote request for the same epoch, it will be granted context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 1)); context.pollUntilResponse(); - - assertTrue(context.client.quorum().isUnattached()); context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), true); - // if an observer sends a pre-vote request for the same epoch, it should also be granted - context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 1)); + // if an observer with up-to-date log sends a pre-vote request for the same epoch, it will be granted + context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 2)); Review Comment: hm, can't think of a reason. I'll standardize -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org