jsancio commented on code in PR #17352: URL: https://github.com/apache/kafka/pull/17352#discussion_r1876361982
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3137,12 +3139,12 @@ private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) { transitionToCandidate(currentTimeMs); return 0L; } else { - return state.remainingElectionTimeMs(currentTimeMs); + return pollUnattachedAsObserver(state, currentTimeMs); Review Comment: This is correct but it is odd naming that `pollUnattachedAsVoter` calls `pollUnattachedAsObservers`. Maybe this name makes more sense `pollUnattachedCommon`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2628,6 +2697,100 @@ public void testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc) context.assertElectedLeader(epoch, voter3); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int leaderId = localId + 1; + int otherNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .collect(Collectors.toList()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withKip853Rpc(withKip853Rpc) + .withElectedLeader(epoch, leaderId) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest1.destination().id()); + context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest1.correlationId(), + fetchRequest1.destination(), + context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, Errors.BROKER_NOT_AVAILABLE) + ); + context.pollUntilRequest(); + + // We should retry the Fetch against the other voter since the original + // voter connection will be backing off. + RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); + assertNotEquals(leaderId, fetchRequest2.destination().id()); + assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); + context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); + + Errors error = fetchRequest2.destination().id() == leaderId ? + Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER; + context.deliverResponse( + fetchRequest2.correlationId(), + fetchRequest2.destination(), + context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, error) + ); + context.client.poll(); + + context.assertElectedLeader(epoch, leaderId); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testFollowerLeaderRediscoveryAfterRequestTimeout(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int leaderId = localId + 1; + int otherNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .collect(Collectors.toList()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withKip853Rpc(withKip853Rpc) + .withElectedLeader(epoch, leaderId) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest1.destination().id()); + context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); + + context.time.sleep(context.requestTimeoutMs()); + context.pollUntilRequest(); + + // We should retry the Fetch against the other voter since the original + // voter connection will be backing off. + RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); + assertNotEquals(leaderId, fetchRequest2.destination().id()); + assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); + context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest2.correlationId(), + fetchRequest2.destination(), + context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH) + ); + context.client.poll(); + + context.assertElectedLeader(epoch, leaderId); Review Comment: Similar comment here. The leader didn't change so you don't know if this is from the FETCH response for `fetchRequest2` or the existing state of the replica. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2628,6 +2697,100 @@ public void testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc) context.assertElectedLeader(epoch, voter3); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int leaderId = localId + 1; + int otherNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .collect(Collectors.toList()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withKip853Rpc(withKip853Rpc) + .withElectedLeader(epoch, leaderId) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest1.destination().id()); + context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest1.correlationId(), + fetchRequest1.destination(), + context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, Errors.BROKER_NOT_AVAILABLE) + ); + context.pollUntilRequest(); + + // We should retry the Fetch against the other voter since the original + // voter connection will be backing off. + RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); + assertNotEquals(leaderId, fetchRequest2.destination().id()); + assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); + context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); + + Errors error = fetchRequest2.destination().id() == leaderId ? + Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER; Review Comment: Would this always be `NOT_LEADER_OR_FOLLOWER` since `assertNotEquals(leaderId, ...)` passed? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2628,6 +2697,100 @@ public void testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc) context.assertElectedLeader(epoch, voter3); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int leaderId = localId + 1; + int otherNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .collect(Collectors.toList()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withKip853Rpc(withKip853Rpc) + .withElectedLeader(epoch, leaderId) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest1.destination().id()); + context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest1.correlationId(), + fetchRequest1.destination(), + context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, Errors.BROKER_NOT_AVAILABLE) + ); + context.pollUntilRequest(); + + // We should retry the Fetch against the other voter since the original + // voter connection will be backing off. + RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); + assertNotEquals(leaderId, fetchRequest2.destination().id()); + assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); + context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); + + Errors error = fetchRequest2.destination().id() == leaderId ? + Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER; + context.deliverResponse( + fetchRequest2.correlationId(), + fetchRequest2.destination(), + context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, error) + ); + context.client.poll(); + + context.assertElectedLeader(epoch, leaderId); Review Comment: Okay but technically, the leader didn't change so you don't know if this is from the FETCH response for `fetchRequest2` or the existing state of the replica. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3057,28 +3057,30 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { - return maybeSendAnyVoterFetch(currentTimeMs); + return maybeSendAnyBootstrapFetch(currentTimeMs); } else { - final long backoffMs; - - // If the current leader is backing off due to some failure or if the - // request has timed out, then we attempt to send the Fetch to another - // voter in order to discover if there has been a leader change. - Node leaderNode = state.leaderNode(channel.listenerName()); - if (requestManager.hasRequestTimedOut(leaderNode, currentTimeMs)) { - // Once the request has timed out backoff the connection - requestManager.reset(leaderNode); - backoffMs = maybeSendAnyVoterFetch(currentTimeMs); - } else if (requestManager.isBackingOff(leaderNode, currentTimeMs)) { - backoffMs = maybeSendAnyVoterFetch(currentTimeMs); - } else if (!requestManager.hasAnyInflightRequest(currentTimeMs)) { - backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs); - } else { - backoffMs = requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs); - } + return maybeSendFetchToQuorum(state, currentTimeMs); + } + } - return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs)); + private long maybeSendFetchToQuorum(FollowerState state, long currentTimeMs) { Review Comment: How about `maybeSendFetchToBestNode`? The best node is the leader but if the leader is unavailable then send the request to the any of the bootstrap servers. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2755,7 +2755,7 @@ private FetchRequestData buildFetchRequest() { .setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel())); } - private long maybeSendAnyVoterFetch(long currentTimeMs) { + private long maybeSendAnyBootstrapFetch(long currentTimeMs) { Review Comment: How about `maybeSendToAnyBootstrap`? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2628,6 +2697,100 @@ public void testVoteResponseIgnoredAfterBecomingFollower(boolean withKip853Rpc) context.assertElectedLeader(epoch, voter3); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int leaderId = localId + 1; + int otherNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .collect(Collectors.toList()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withKip853Rpc(withKip853Rpc) + .withElectedLeader(epoch, leaderId) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest1.destination().id()); + context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest1.correlationId(), + fetchRequest1.destination(), + context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, Errors.BROKER_NOT_AVAILABLE) + ); + context.pollUntilRequest(); + + // We should retry the Fetch against the other voter since the original + // voter connection will be backing off. + RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); + assertNotEquals(leaderId, fetchRequest2.destination().id()); + assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); + context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); + + Errors error = fetchRequest2.destination().id() == leaderId ? + Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER; + context.deliverResponse( + fetchRequest2.correlationId(), + fetchRequest2.destination(), + context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, error) + ); + context.client.poll(); + + context.assertElectedLeader(epoch, leaderId); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testFollowerLeaderRediscoveryAfterRequestTimeout(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int leaderId = localId + 1; + int otherNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId); + List<InetSocketAddress> bootstrapServers = voters + .stream() + .map(RaftClientTestContext::mockAddress) + .collect(Collectors.toList()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withBootstrapServers(Optional.of(bootstrapServers)) + .withKip853Rpc(withKip853Rpc) + .withElectedLeader(epoch, leaderId) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest1.destination().id()); + context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); + + context.time.sleep(context.requestTimeoutMs()); + context.pollUntilRequest(); + + // We should retry the Fetch against the other voter since the original + // voter connection will be backing off. + RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); + assertNotEquals(leaderId, fetchRequest2.destination().id()); + assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); + context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); + + context.deliverResponse( + fetchRequest2.correlationId(), + fetchRequest2.destination(), + context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH) Review Comment: Is this an accurate error? `FENCED_LEADER_EPOCH`. Technically, a replica returns this error if the sent epoch is less that the epoch known by the receiving replica. This is not the case in the this test case since the returned epoch is the same as the sent epoch. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -1780,6 +1780,75 @@ public void testUnattachedAsObserverDoesNotBecomeCandidateAfterElectionTimeout(b assertEquals(0, context.channel.drainSendQueue().size()); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + int leaderNodeId = localId + 2; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId, leaderNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withUnknownLeader(epoch) + .withKip853Rpc(withKip853Rpc) + .build(); + + context.pollUntilRequest(); + RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0L, 0); + assertTrue(context.client.quorum().isUnattached()); + assertTrue(context.client.quorum().isVoter()); + + // receives a fetch response specifying who the leader is + Errors responseError = (request.destination().id() == otherNodeId) ? Errors.NOT_LEADER_OR_FOLLOWER : Errors.NONE; + context.deliverResponse( + request.correlationId(), + request.destination(), + context.fetchResponse(epoch, leaderNodeId, MemoryRecords.EMPTY, 0L, responseError) + ); + + context.client.poll(); + assertTrue(context.client.quorum().isFollower()); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testUnattachedAsVoterCanBecomeCandidate(boolean withKip853Rpc) throws Exception { Review Comment: I would assume that we would already have a similar test. How about extending `testInitializeAsCandidateAndBecomeLeader` to do any additional checks done in this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org