ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1900580715


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3026,20 +3099,51 @@ private long pollCandidate(long currentTimeMs) {
             return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
         } else if (state.isBackingOff()) {
             if (state.isBackoffComplete(currentTimeMs)) {
-                logger.info("Re-elect as candidate after election backoff has 
completed");
-                transitionToCandidate(currentTimeMs);
+                logger.info("Transition to prospective after election backoff 
has completed");
+                transitionToProspective(currentTimeMs);
                 return 0L;
             }
             return state.remainingBackoffMs(currentTimeMs);
         } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
-            long backoffDurationMs = 
binaryExponentialElectionBackoffMs(state.retries());
-            logger.info("Election has timed out, backing off for {}ms before 
becoming a candidate again",
-                backoffDurationMs);
-            state.startBackingOff(currentTimeMs, backoffDurationMs);
-            return backoffDurationMs;
+            logger.info("Election was not granted, transitioning to 
prospective");
+            transitionToProspective(currentTimeMs);
+            return 0L;
         } else {
+            long minVoteRequestBackoffMs = maybeSendVoteRequests(state, 
currentTimeMs);
+            return Math.min(minVoteRequestBackoffMs, 
state.remainingElectionTimeMs(currentTimeMs));
+        }
+    }
+
+    private long pollProspective(long currentTimeMs) {
+        ProspectiveState state = quorum.prospectiveStateOrThrow();
+        GracefulShutdown shutdown = this.shutdown.get();
+
+        if (shutdown != null) {
             long minRequestBackoffMs = maybeSendVoteRequests(state, 
currentTimeMs);
-            return Math.min(minRequestBackoffMs, 
state.remainingElectionTimeMs(currentTimeMs));
+            return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
+        } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
+            logger.info("Election timed out before receiving sufficient vote 
responses to become candidate");
+            prospectiveTransitionAfterElectionLoss(state, currentTimeMs);
+            return 0L;
+        } else {
+            long minVoteRequestBackoffMs = maybeSendVoteRequests(state, 
currentTimeMs);
+            return Math.min(minVoteRequestBackoffMs, 
state.remainingElectionTimeMs(currentTimeMs));
+        }
+    }
+
+    private void prospectiveTransitionAfterElectionLoss(ProspectiveState 
prospective, long currentTimeMs) {
+        if (prospective.election().hasLeader() && 
!prospective.leaderEndpoints().isEmpty()) {
+            logger.info(
+                "Transitioning to Follower of leader {}",

Review Comment:
   Replacing with code comments instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to