kevin-wu24 commented on code in PR #19589:
URL: https://github.com/apache/kafka/pull/19589#discussion_r2276475705


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3271,13 +3317,56 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
             backoffMs,
             Math.min(
                 state.remainingFetchTimeMs(currentTimeMs),
-                state.remainingUpdateVoterPeriodMs(currentTimeMs)
+                state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
             )
         );
     }
 
+    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+        /* When the cluster supports reconfiguration, only replicas that can 
become a voter
+         * and are configured to auto join should attempt to automatically 
join the voter
+         * set for the configured topic partition.
+         */
+        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
+            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+    }
+
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
-        return maybeSendFetchToBestNode(state, currentTimeMs);
+        GracefulShutdown shutdown = this.shutdown.get();
+        final long backoffMs;
+        if (shutdown != null) {
+            // If we are an observer, then we can shutdown immediately. We 
want to
+            // skip potentially sending any add or remove voter RPCs.
+            backoffMs = 0;
+        } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
+            final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+            final var voters = partitionState.lastVoterSet();
+            final RequestSendResult sendResult;
+            if (voters.voterIds().contains(localReplicaKey.id())) {
+                /* The replica's id is in the voter set but the replica is not 
a voter because
+                 * the directory id of the voter set entry is different. 
Remove the old voter.
+                 * Local replica is not in the voter set because the replica 
is an observer.
+                 */
+                final var oldVoter = voters.voterKeys()
+                    .stream()
+                    .filter(replicaKey -> replicaKey.id() == 
localReplicaKey.id())
+                    .findFirst()
+                    .get();
+                sendResult = maybeSendRemoveVoterRequest(state, oldVoter, 
currentTimeMs);
+            } else {
+                sendResult = maybeSendAddVoterRequest(state, currentTimeMs);
+            }
+            backoffMs = sendResult.timeToWaitMs();
+            if (sendResult.requestSent()) {
+                state.resetUpdateVoterSetPeriod(currentTimeMs);
+            }
+        } else {
+            backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
+        }
+        return Math.min(
+            backoffMs,
+            state.remainingUpdateVoterSetPeriodMs(currentTimeMs)

Review Comment:
   Yeah, for a broker, this `return` statement will eventually always return 0 
since we have no way to reset it. I don't think conceptually it makes sense to 
reset that timer upon receiving a FetchResponse. Instead we should not read 
that timer's value when determining the backoff if we are a broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to