kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2561593244
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -105,7 +105,7 @@ public class QuorumConfig {
public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX
+ "auto.join.enable";
public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether
a KRaft controller should automatically " +
- "join the cluster metadata partition for its cluster id.";
+ "join the cluster metadata partition for its cluster id when the node
startup.";
Review Comment:
"Controls whether a KRaft controller (<nodeId, directoryUUID> tuple) should
automatically join the cluster metadata partition as a voter during node
startup if the controller is not already a voter"
There is probably a better way to word the above.
This most accurate describes what we are changing the semantic too. I think
I am okay with this definition of the semantic from a UX POV. I guess we do not
really care if the `ReplicaKey` was added by auto-join or not, just that it was
added as a voter.
Our proposed semantic is essentially to make the auto-join code "at most
once" for the lifetime of a given controller process execution, conditioned on
the local ReplicaKey not being part of our voter set. However, this semantic
can run into UX issues too.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java:
##########
@@ -317,6 +321,131 @@ private void pollAndDeliverFetchToUpdateVoterSet(
context.client.poll();
}
+ @Test
+ public void testBootstrapVoterSetDoesNotSendAddVoterAfterRemove() throws
Exception {
+ // Test that a bootstrap voter node, after being removed by
RemoveVoter RPC,
+ // will NOT automatically rejoin (canAutoJoin remains false)
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var follower = replicaKey(leader.id() + 1, true);
+ final var thisVoter = replicaKey(follower.id() + 1, true);
+
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ thisVoter.id(),
+ thisVoter.directoryId().get()) // Same directory ID -> node is
voter
+ .withRaftProtocol(KIP_853_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, follower, thisVoter)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(true)
+ .withCanBecomeVoter(true)
+ .build();
+
+ // Node should be a follower (voter) initially
+ assertTrue(context.client.quorum().isFollower());
+
+ // Complete initial fetch
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // Simulate this node being removed by RemoveVoter RPC (initiated by
another node)
+ // Update voter set via fetch - this node is now removed from voter set
+ pollAndDeliverFetchToUpdateVoterSet(
+ context,
+ epoch,
+ VoterSetTest.voterSet(Stream.of(leader, follower)) // This node
is no longer in voter set
+ );
+
+ // Node should now be an observer after being removed
+ assertTrue(context.client.quorum().isObserver());
+
+ // Advance time to expire update voter set timer
+ context.time.sleep(context.fetchTimeoutMs);
+
+ // Poll and verify that NO AddVoter request is sent
+ // With canAutoJoin=false (set during init because ID was in voter
set),
+ // node should NOT automatically rejoin
+ context.time.sleep(1);
+ context.pollUntilRequest();
+
+ // Should not include AddVoter
+ assertEquals(0,
context.channel.drainSentRequests(Optional.of(ApiKeys.ADD_RAFT_VOTER)).size());
+
+ }
+
+ @Test
+ public void testSuccessfulAddVoterSetsCanAutoJoinAgain() throws Exception {
+ // Test scenario:
+ // 1. Node initializes as observer
+ // 2. Node joins as follower via AddVoter
+ // 3. Node is removed via RemoveVoter and becomes observer again
+ // 4. Node should NOT send AddVoter request again
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var follower = replicaKey(leader.id() + 1, true);
+ final var newVoter = replicaKey(follower.id() + 1, true);
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ newVoter.id(),
+ newVoter.directoryId().get())
+ .withRaftProtocol(KIP_853_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, follower)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(true)
+ .withCanBecomeVoter(true)
+ .build();
+
+ // 1. Node should be an observer initially
+ assertTrue(context.client.quorum().isObserver());
+
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // 2. Send AddVoter request to join cluster
+ final var addVoterRequest = pollAndSendAddVoter(context, newVoter);
+
+ // Successfully add voter - this should set canAutoJoin to false
+ context.deliverResponse(
+ addVoterRequest.correlationId(),
+ addVoterRequest.destination(),
+ RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
+ );
+
+ // Poll to process the response
+ context.client.poll();
+
+ // Update voter set via fetch - node is now a voter
+ pollAndDeliverFetchToUpdateVoterSet(
+ context,
+ epoch,
+ VoterSetTest.voterSet(Stream.of(leader, follower, newVoter))
+ );
+
+ // Node should now be a follower (voter)
+ assertTrue(context.client.quorum().isFollower());
+
+ // 3. Simulate this node being removed by RemoveVoter RPC
+ pollAndDeliverFetchToUpdateVoterSet(
+ context,
+ epoch,
+ VoterSetTest.voterSet(Stream.of(leader, follower)) // newVoter is
removed
+ );
+
+ // Node should now be an observer after being removed
+ assertTrue(context.client.quorum().isObserver());
+
+ // Advance time to expire update voter set timer
+ context.time.sleep(context.fetchTimeoutMs);
+
+ // 4. Poll and verify that NO AddVoter request is sent
+ // Because canAutoJoin was set to false after successful AddVoter
Review Comment:
Please update the comments around `canAutoJoin` since we renamed the boolean.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -222,6 +222,8 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
private volatile RemoveVoterHandler removeVoterHandler;
private volatile UpdateVoterHandler updateVoterHandler;
+ private volatile boolean hasAutoJoined = false;
Review Comment:
Let's rename this to `hasJoined`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]