jsancio commented on code in PR #19589: URL: https://github.com/apache/kafka/pull/19589#discussion_r2070577759
########## raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java: ########## @@ -99,6 +100,13 @@ public class QuorumConfig { public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20; + public static final String QUORUM_AUTO_JOIN_ENABLE = QUORUM_PREFIX + "auto.join.enable"; + public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "If set to true, controllers will remove the entry " + + "in the voters set that matches its replica id but does not match its directory id if it exists by sending " + + "the RemoveVoter RPC. When no old entry for the controller exists in the voter set, it will then add itself " + + "by sending a AddVoter RPC to the leader."; Review Comment: This documentation will be read by end users. They are not and don't need to understand the algorithm. We could just use the paragraph from the KIP: > Controls whether a KRaft controller should automatically join the cluster metadata partition for its cluster id. If the configuration is set to true the controller must be stopped before removing the controller with kafka-metadata-quorum remove-controller. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3274,6 +3320,37 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); + } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && + quorumConfig.autoJoinEnable() && state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) { + var voters = partitionState.lastVoterSet(); + var localReplicaKey = quorum.localReplicaKeyOrThrow(); + final boolean resetAddRemoveVoterTimer; + final long backoffMs; + + Optional<ReplicaKey> oldVoter = voters.getOldVoterForReplicaKey(localReplicaKey); Review Comment: How about: ```java var voterIds = voters.voterIds(); if (voterIds.contains(localReplicaKey.id()) { /* Replica id is in the voter set but replica is not voter. Remove old voter. * Local replica is not in the voter set because the replica is getting as an observer. */ ... } else { // Replica id is not in the voter set. Add local replica ... } ``` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3328,6 +3405,23 @@ private UpdateRaftVoterRequestData buildUpdateVoterRequest() { ); } + private AddRaftVoterRequestData buildAddVoterRequest() { + return RaftUtil.addVoterRequest( + clusterId, + // TODO: What to set the AddVoterRequest timeout to? + 10000, Review Comment: Make it the request timeout. ########## raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java: ########## @@ -161,6 +172,10 @@ public int appendLingerMs() { return appendLingerMs; } + public boolean autoJoinEnable() { + return autoJoinEnable; + } Review Comment: This could just be `autoJoin`. For both the method name and the field name. ########## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ########## @@ -42,6 +42,8 @@ public class FollowerState implements EpochState { private final Timer fetchTimer; // Used to track when to send another update voter request private final Timer updateVoterPeriodTimer; + // Used to track when to send another add or remove voter request + private final Timer addRemoveVoterPeriodTimer; Review Comment: Is there a reason why you added a new timer? It currently has the same period as update and both sets of RPCs are mutually exclusive. Following voters would send update voter in a period while following observers would send add or remove in a period. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3274,6 +3320,37 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToAnyBootstrap(currentTimeMs); + } else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush && + quorumConfig.autoJoinEnable() && state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) { Review Comment: Okay. I think we should document why we require both `followersAlwaysFlush` and `autoJoinEnable` to be true. ########## raft/src/main/java/org/apache/kafka/raft/VoterSet.java: ########## @@ -337,6 +337,33 @@ public boolean supportsVersion(KRaftVersion version) { .allMatch(voter -> voter.supportsVersion(version)); } + /** + * Gets the old voter's replica key for a given replica key if it exists. + * An old voter is a voter that has the same replica id as the given replica key, but a different directory id. + * + * @param replicaKey the replica key to check against + * @return the replica key with the same id but a different directory id, if present, Optional.empty() otherwise + */ + public Optional<ReplicaKey> getOldVoterForReplicaKey(ReplicaKey replicaKey) { + return voters + .values() + .stream() + .map(VoterNode::voterKey) + .filter(voter -> voter.id() == replicaKey.id() && !voter.directoryId().equals(replicaKey.directoryId())) + .findFirst(); + } + + /** + * @param replicaKey the replica key to check against + * @return true if the voter set does not contain the given replica id, false otherwise + */ + public boolean doesNotContainReplicaId(ReplicaKey replicaKey) { + return voters + .values() + .stream() + .noneMatch(voter -> voter.voterKey().id() == replicaKey.id()); + } Review Comment: I think we can remove this methods if you agree with my other comment. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2618,6 +2656,14 @@ private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) { handledSuccessfully = handleUpdateVoterResponse(response, currentTimeMs); break; + case ADD_RAFT_VOTER: + handledSuccessfully = handleAddVoterResponse(response, currentTimeMs); + break; + + case REMOVE_RAFT_VOTER: + handledSuccessfully = handleRemoveVoterResponse(response, currentTimeMs); + break; Review Comment: Since now kraft can send these RPCs, we need to add a case for them in `RaftUtil#errorResponse`. ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -375,6 +377,11 @@ Builder withLocalListeners(Endpoints localListeners) { return this; } + Builder withAutoJoinEnabled(boolean autoJoinEnabled) { + this.autoJoinEnabled = autoJoinEnabled; + return this; + } Review Comment: This could be `withAutoJoin` for the method name and `autoJoin` for the field name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org