ahuang98 commented on code in PR #18240: URL: https://github.com/apache/kafka/pull/18240#discussion_r1900580715
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3026,20 +3099,51 @@ private long pollCandidate(long currentTimeMs) { return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs); } else if (state.isBackingOff()) { if (state.isBackoffComplete(currentTimeMs)) { - logger.info("Re-elect as candidate after election backoff has completed"); - transitionToCandidate(currentTimeMs); + logger.info("Transition to prospective after election backoff has completed"); + transitionToProspective(currentTimeMs); return 0L; } return state.remainingBackoffMs(currentTimeMs); } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { - long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries()); - logger.info("Election has timed out, backing off for {}ms before becoming a candidate again", - backoffDurationMs); - state.startBackingOff(currentTimeMs, backoffDurationMs); - return backoffDurationMs; + logger.info("Election was not granted, transitioning to prospective"); + transitionToProspective(currentTimeMs); + return 0L; } else { + long minVoteRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); + return Math.min(minVoteRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); + } + } + + private long pollProspective(long currentTimeMs) { + ProspectiveState state = quorum.prospectiveStateOrThrow(); + GracefulShutdown shutdown = this.shutdown.get(); + + if (shutdown != null) { long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); - return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); + return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs); + } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { + logger.info("Election timed out before receiving sufficient vote responses to become candidate"); + prospectiveTransitionAfterElectionLoss(state, currentTimeMs); + return 0L; + } else { + long minVoteRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); + return Math.min(minVoteRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); + } + } + + private void prospectiveTransitionAfterElectionLoss(ProspectiveState prospective, long currentTimeMs) { + if (prospective.election().hasLeader() && !prospective.leaderEndpoints().isEmpty()) { + logger.info( + "Transitioning to Follower of leader {}", Review Comment: Replacing with code comments instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org