kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2565395726


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3285,6 +3301,9 @@ private void 
prospectiveTransitionAfterElectionLoss(ProspectiveState prospective
     private long pollFollower(long currentTimeMs) {
         FollowerState state = quorum.followerStateOrThrow();
         if (quorum.isVoter()) {
+            if (quorumConfig.autoJoin()) {
+                hasJoined = true;
+            }

Review Comment:
   We can remove this code.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,14 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
         logger.info("Starting voters are {}", partitionState.lastVoterSet());
+        if (nodeId.isPresent()) {
+            // if the the VotersRecord with the highest offset contain the 
node id of this node,
+            // mark it as already joined because it is already in the voter 
set.
+            // Check using ReplicaKey (id + directoryId) to handle KIP-853 
properly
+            hasJoined = partitionState.lastVoterSet().isVoter(
+                ReplicaKey.of(nodeId.getAsInt(), nodeDirectoryId)
+            );
+        }

Review Comment:
   Here we should only set `hasJoined = Optional.of(true)` IFF the 
partitionState.lastVoterSet() is a singleton set containing only the local 
`ReplicaKey`.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3352,8 +3371,8 @@ private boolean 
shouldSendAddOrRemoveVoterRequest(FollowerState state, long curr
          * and are configured to auto join should attempt to automatically 
join the voter
          * set for the configured topic partition.
          */
-        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
-            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+        return !hasJoined && 
partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter

Review Comment:
   ```suggestion
           return hasJoined.isPresent() && !hasJoined.get() && 
partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -222,6 +222,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     private volatile RemoveVoterHandler removeVoterHandler;
     private volatile UpdateVoterHandler updateVoterHandler;
 
+    private volatile boolean hasJoined = false;

Review Comment:
   I have an idea how we can model this state properly. We can use an 
Optional<Boolean>. Now we can represent 3 distinct "hasJoined" states:
   
   1. UNKNOWN: We'll use Optional.empty(), which applies to all nodes on 
startup, including brokers
   2. HAS_NOT_JOINED: We'll use Optional.of(false), which only controllers can 
transition to.
   3. HAS_JOINED: We'll use Optional.of(true), which only controllers can 
transition to.
   
   The possible state transitions for controllers are:
   
   1. UNKNOWN -> HAS_JOINED: This occurs when the controller fetches, is 
"up-to-date" with the HWM, and discovers it IS in the voter set, or if the 
controller is the only member of the voter set when starting up.
   2. UNKNOWN -> HAS_NOT_JOINED: This occurs when the controller fetches, is 
"up-to-date" with the HWM, and discovers it is NOT in the voter set. 
   3. HAS_NOT_JOINED -> HAS_JOINED: This also occurs when the controller 
fetches, is "up-to-date" with the HWM, and discovers it is in the voter set.
   
   Importantly, state transition 2 can only occur once in the lifetime of the 
controller, because we cannot transition to UNKNOWN during the lifetime of the 
process. Additionally, the controller process cannot go from HAS_JOINED -> 
HAS_NOT_JOINED ever.
   
   `shouldSendAddOrRemoveVoter` will be conditioned on the state being 
`HAS_NOT_JOINED`. Therefore, the two invariants above mean that, during its 
process lifetime, a controller will not try to auto-join again after being 
removed from the voter set.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1805,6 +1809,19 @@ private boolean handleFetchResponse(
                 OptionalLong highWatermark = partitionResponse.highWatermark() 
< 0 ?
                     OptionalLong.empty() : 
OptionalLong.of(partitionResponse.highWatermark());
                 updateFollowerHighWatermark(state, highWatermark);
+
+                if (initHighWatermark < 0 && partitionResponse.highWatermark() 
>= 0) {
+                    initHighWatermark = partitionResponse.highWatermark();
+
+                    state.highWatermark().ifPresent(hw -> {
+                            if (hw.offset() >= initHighWatermark && 
nodeId.isPresent()) {
+                                hasJoined = 
partitionState.lastVoterSet().isVoter(
+                                    ReplicaKey.of(nodeId.getAsInt(), 
nodeDirectoryId));
+                            }
+                        }
+                    );

Review Comment:
   hmmm, this if check seems incorrect. Doesn't this mean we won't ever enter 
this method again?
   
   We should be checking the `highWatermark` against our local LEO, and if our 
LEO >= HWM, the local node considers itself "caught up" and can set the 
`hasJoined` state to the proper value. I believe this is correct because we can 
only have at most one uncommitted voters record at a time. 
   
   Importantly, we need to enforce the state transition requirements in my 
comment above here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to