jsancio commented on code in PR #19589:
URL: https://github.com/apache/kafka/pull/19589#discussion_r2263100630


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3271,13 +3317,57 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
             backoffMs,
             Math.min(
                 state.remainingFetchTimeMs(currentTimeMs),
-                state.remainingUpdateVoterPeriodMs(currentTimeMs)
+                state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
             )
         );
     }
 
+    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
+            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+    }
+
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
-        return maybeSendFetchToBestNode(state, currentTimeMs);
+        GracefulShutdown shutdown = this.shutdown.get();
+        final long backoffMs;
+        if (shutdown != null) {
+            // If we are an observer, then we can shutdown immediately. We 
want to
+            // skip potentially sending any add or remove voter RPCs.
+            backoffMs = 0;
+        } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
+            /* Only replicas that can become a voter and are configured to 
auto join should
+             * attempt to automatically join the voter set for the configured 
topic partition.
+             */
+            final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+            final var voters = partitionState.lastVoterSet();
+            final RequestSendResult sendResult;
+            if (voters.voterIds().contains(localReplicaKey.id())) {
+                /* Replica id is in the voter set but replica is not voter. 
Remove old voter.
+                 * Local replica is not in the voter set because the replica 
is an observer.
+                 */
+                final var oldVoter = voters.voterKeys()
+                    .stream()
+                    .filter(replicaKey -> replicaKey.id() == 
localReplicaKey.id())
+                    .findFirst()
+                    .get();
+                sendResult = maybeSendRemoveVoterRequest(state, oldVoter, 
currentTimeMs);
+            } else {
+                sendResult = maybeSendAddVoterRequest(state, currentTimeMs);
+            }
+            backoffMs = sendResult.timeToWaitMs();
+            if (sendResult.requestSent()) {
+                state.resetUpdateVoterSetPeriod(currentTimeMs);
+            }
+        } else {
+            backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
+        }
+        return Math.min(
+            backoffMs,
+            Math.min(
+                state.remainingFetchTimeMs(currentTimeMs),

Review Comment:
   Observer don't need to backoff until the fetch timeout since observer do not 
read or handle fetch timeouts.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3271,13 +3317,57 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
             backoffMs,
             Math.min(
                 state.remainingFetchTimeMs(currentTimeMs),
-                state.remainingUpdateVoterPeriodMs(currentTimeMs)
+                state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
             )
         );
     }
 
+    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
+            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+    }
+
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
-        return maybeSendFetchToBestNode(state, currentTimeMs);
+        GracefulShutdown shutdown = this.shutdown.get();
+        final long backoffMs;
+        if (shutdown != null) {
+            // If we are an observer, then we can shutdown immediately. We 
want to
+            // skip potentially sending any add or remove voter RPCs.
+            backoffMs = 0;
+        } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
+            /* Only replicas that can become a voter and are configured to 
auto join should
+             * attempt to automatically join the voter set for the configured 
topic partition.
+             */

Review Comment:
   See my other comment but you can move this comment to 
`shouldSendAddOrRemoveVoterRequest`.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3271,13 +3317,57 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
             backoffMs,
             Math.min(
                 state.remainingFetchTimeMs(currentTimeMs),
-                state.remainingUpdateVoterPeriodMs(currentTimeMs)
+                state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
             )
         );
     }
 
+    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
+            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);

Review Comment:
   Please document why we need this predicate. See 
`shouldSendUpdateVoteRequest` for an example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to