jsancio commented on code in PR #19416:
URL: https://github.com/apache/kafka/pull/19416#discussion_r2054252657


##########
raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java:
##########
@@ -104,26 +111,39 @@ public CompletableFuture<UpdateRaftVoterResponseData> 
handleUpdateVoterRequest(
             );
         }
 
-        // KAFKA-16538 will implement the case when the kraft.version is 0
-        // Check that the cluster supports kraft.version >= 1
+        // Read the voter set from the log or leader state
         KRaftVersion kraftVersion = partitionState.lastKraftVersion();
-        if (!kraftVersion.isReconfigSupported()) {
-            return CompletableFuture.completedFuture(
-                RaftUtil.updateVoterResponse(
-                    Errors.UNSUPPORTED_VERSION,
-                    requestListenerName,
-                    new LeaderAndEpoch(
-                        localId,
-                        leaderState.epoch()
-                    ),
-                    leaderState.leaderEndpoints()
-                )
-            );
-        }
+        final Optional<KRaftVersionUpgrade.Voters> inMemoryVoters;
+        final Optional<VoterSet> voters;
+        if (kraftVersion.isReconfigSupported()) {
+            inMemoryVoters = Optional.empty();
 
-        // Check that there are no uncommitted VotersRecord
-        Optional<LogHistory.Entry<VoterSet>> votersEntry = 
partitionState.lastVoterSetEntry();
-        if (votersEntry.isEmpty() || votersEntry.get().offset() >= 
highWatermark.get()) {
+            // Check that there are no uncommitted VotersRecord
+            Optional<LogHistory.Entry<VoterSet>> votersEntry = 
partitionState.lastVoterSetEntry();
+            if (votersEntry.isEmpty() || votersEntry.get().offset() >= 
highWatermark.get()) {
+                voters = Optional.empty();
+            } else {
+                voters = votersEntry.map(LogHistory.Entry::value);
+            }
+        } else {
+            inMemoryVoters = leaderState.volatileVoters();
+            if (inMemoryVoters.isEmpty()) {

Review Comment:
   This can happen if the a remove voter send an update voter request before 
the leader has written the kraft version to the log and applied to the 
`partitionState`. The general flow for upgrade is:
   1. Controller thread generates the control records and appends them to the 
batch accumulator
   2. The kraft thread collects the control records in the batch accumulator, 
appends them to the log and updates the partitions state.
   
   The kraft.version used here is not updated until (2.). Any update voter 
request handled between 1. and 2. the leader is asking the remove replica to 
retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to