jsancio commented on code in PR #19589: URL: https://github.com/apache/kafka/pull/19589#discussion_r2258341169
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3271,13 +3317,46 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { backoffMs, Math.min( state.remainingFetchTimeMs(currentTimeMs), - state.remainingUpdateVoterPeriodMs(currentTimeMs) + state.remainingUpdateVoterSetPeriodMs(currentTimeMs) ) ); } private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { - return maybeSendFetchToBestNode(state, currentTimeMs); + if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter && + quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) { + /* Only replicas that can become a voter and are configured to auto join should + * attempt to automatically join the voter set for the configured topic partition. + */ + final var localReplicaKey = quorum.localReplicaKeyOrThrow(); + final var voters = partitionState.lastVoterSet(); + final RequestSendResult sendResult; + if (voters.voterIds().contains(localReplicaKey.id())) { + /* Replica id is in the voter set but replica is not voter. Remove old voter. + * Local replica is not in the voter set because the replica is an observer. + */ + final var oldVoter = voters.voterKeys() + .stream() + .filter(replicaKey -> replicaKey.id() == localReplicaKey.id()) + .findFirst() + .get(); + sendResult = maybeSendRemoveVoterRequest(state, oldVoter, currentTimeMs); + } else { + sendResult = maybeSendAddVoterRequest(state, currentTimeMs); + } + if (sendResult.requestSent()) { + state.resetUpdateVoterSetPeriod(currentTimeMs); + } + return Math.min( + sendResult.timeToWaitMs(), + Math.min( + state.remainingFetchTimeMs(currentTimeMs), + state.remainingUpdateVoterSetPeriodMs(currentTimeMs) + ) + ); + } else { + return maybeSendFetchToBestNode(state, currentTimeMs); Review Comment: This backoff not correct now that observers can send AddVoter and RemoveVoter requests. Take a look how I solved it for `pollFollowerAsVoter`. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3271,13 +3317,46 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { backoffMs, Math.min( state.remainingFetchTimeMs(currentTimeMs), - state.remainingUpdateVoterPeriodMs(currentTimeMs) + state.remainingUpdateVoterSetPeriodMs(currentTimeMs) ) ); } private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { - return maybeSendFetchToBestNode(state, currentTimeMs); + if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter && + quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) { Review Comment: Let's add a `shouldSendAddOrRemoveVoterRequest` similar to `shouldSendUpdateVoterRequest`. This would allow you to better document this predicate. ########## raft/src/main/java/org/apache/kafka/raft/RaftUtil.java: ########## @@ -524,14 +526,16 @@ public static AddRaftVoterRequestData addVoterRequest( String clusterId, int timeoutMs, ReplicaKey voter, - Endpoints listeners + Endpoints listeners, + boolean ackWhenCommitted Review Comment: I see. That's fair. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org