jsancio commented on code in PR #19589:
URL: https://github.com/apache/kafka/pull/19589#discussion_r2070577759


##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -99,6 +100,13 @@ public class QuorumConfig {
     public static final String QUORUM_RETRY_BACKOFF_MS_DOC = 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
     public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;
 
+    public static final String QUORUM_AUTO_JOIN_ENABLE = QUORUM_PREFIX + 
"auto.join.enable";
+    public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "If set to true, 
controllers will remove the entry " +
+        "in the voters set that matches its replica id but does not match its 
directory id if it exists by sending " +
+        "the RemoveVoter RPC. When no old entry for the controller exists in 
the voter set, it will then add itself " +
+        "by sending a AddVoter RPC to the leader.";

Review Comment:
   This documentation will be read by end users. They are not and don't need to 
understand the algorithm. We could just use the paragraph from the KIP:
   > Controls whether a KRaft controller should automatically join the cluster 
metadata partition for its cluster id. If the configuration is set to true the 
controller must be stopped before removing the controller with 
kafka-metadata-quorum remove-controller.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3274,6 +3320,37 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
         if (state.hasFetchTimeoutExpired(currentTimeMs)) {
             return maybeSendFetchToAnyBootstrap(currentTimeMs);
+        } else if (partitionState.lastKraftVersion().isReconfigSupported() && 
followersAlwaysFlush &&
+            quorumConfig.autoJoinEnable() && 
state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) {
+            var voters = partitionState.lastVoterSet();
+            var localReplicaKey = quorum.localReplicaKeyOrThrow();
+            final boolean resetAddRemoveVoterTimer;
+            final long backoffMs;
+
+            Optional<ReplicaKey> oldVoter = 
voters.getOldVoterForReplicaKey(localReplicaKey);

Review Comment:
   How about:
   
   ```java
   var voterIds = voters.voterIds();
   if (voterIds.contains(localReplicaKey.id()) {
       /* Replica id is in the voter set but replica is not voter. Remove old 
voter.
        * Local replica is not in the voter set because the replica is getting 
as an observer.
        */
       ...
   } else {
       // Replica id is not in the voter set. Add local replica
       ...
   }
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3328,6 +3405,23 @@ private UpdateRaftVoterRequestData 
buildUpdateVoterRequest() {
         );
     }
 
+    private AddRaftVoterRequestData buildAddVoterRequest() {
+        return RaftUtil.addVoterRequest(
+            clusterId,
+            // TODO: What to set the AddVoterRequest timeout to?
+            10000,

Review Comment:
   Make it the request timeout.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -161,6 +172,10 @@ public int appendLingerMs() {
         return appendLingerMs;
     }
 
+    public boolean autoJoinEnable() {
+        return autoJoinEnable;
+    }

Review Comment:
   This could just be `autoJoin`. For both the method name and the field name.



##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -42,6 +42,8 @@ public class FollowerState implements EpochState {
     private final Timer fetchTimer;
     // Used to track when to send another update voter request
     private final Timer updateVoterPeriodTimer;
+    // Used to track when to send another add or remove voter request
+    private final Timer addRemoveVoterPeriodTimer;

Review Comment:
   Is there a reason why you added a new timer? It currently has the same 
period as update and both sets of RPCs are mutually exclusive. Following voters 
would send update voter in a period while following observers would send add or 
remove in a period.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3274,6 +3320,37 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
         if (state.hasFetchTimeoutExpired(currentTimeMs)) {
             return maybeSendFetchToAnyBootstrap(currentTimeMs);
+        } else if (partitionState.lastKraftVersion().isReconfigSupported() && 
followersAlwaysFlush &&
+            quorumConfig.autoJoinEnable() && 
state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) {

Review Comment:
   Okay. I think we should document why we require both `followersAlwaysFlush` 
and `autoJoinEnable` to be true.



##########
raft/src/main/java/org/apache/kafka/raft/VoterSet.java:
##########
@@ -337,6 +337,33 @@ public boolean supportsVersion(KRaftVersion version) {
             .allMatch(voter -> voter.supportsVersion(version));
     }
 
+    /**
+     * Gets the old voter's replica key for a given replica key if it exists.
+     * An old voter is a voter that has the same replica id as the given 
replica key, but a different directory id.
+     *
+     * @param replicaKey the replica key to check against
+     * @return the replica key with the same id but a different directory id, 
if present, Optional.empty() otherwise
+     */
+    public Optional<ReplicaKey> getOldVoterForReplicaKey(ReplicaKey 
replicaKey) {
+        return voters
+            .values()
+            .stream()
+            .map(VoterNode::voterKey)
+            .filter(voter -> voter.id() == replicaKey.id() && 
!voter.directoryId().equals(replicaKey.directoryId()))
+            .findFirst();
+    }
+
+    /**
+     * @param replicaKey the replica key to check against
+     * @return true if the voter set does not contain the given replica id, 
false otherwise
+     */
+    public boolean doesNotContainReplicaId(ReplicaKey replicaKey) {
+        return voters
+            .values()
+            .stream()
+            .noneMatch(voter -> voter.voterKey().id() == replicaKey.id());
+    }

Review Comment:
   I think we can remove this methods if you agree with my other comment.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2618,6 +2656,14 @@ private void handleResponse(RaftResponse.Inbound 
response, long currentTimeMs) {
                 handledSuccessfully = handleUpdateVoterResponse(response, 
currentTimeMs);
                 break;
 
+            case ADD_RAFT_VOTER:
+                handledSuccessfully = handleAddVoterResponse(response, 
currentTimeMs);
+                break;
+
+            case REMOVE_RAFT_VOTER:
+                handledSuccessfully = handleRemoveVoterResponse(response, 
currentTimeMs);
+                break;

Review Comment:
   Since now kraft can send these RPCs, we need to add a case for them in 
`RaftUtil#errorResponse`.



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -375,6 +377,11 @@ Builder withLocalListeners(Endpoints localListeners) {
             return this;
         }
 
+        Builder withAutoJoinEnabled(boolean autoJoinEnabled) {
+            this.autoJoinEnabled = autoJoinEnabled;
+            return this;
+        }

Review Comment:
   This could be `withAutoJoin` for the method name and `autoJoin` for the 
field name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to