kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2561429495
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,13 @@ public void initialize(
logger.info("Reading KRaft snapshot and log as part of the
initialization");
partitionState.updateState();
logger.info("Starting voters are {}", partitionState.lastVoterSet());
+ if (nodeId.isPresent()) {
+ // if the starting voters contain the node id of this node, mark
it as already joined
+ // because it is already in the voter set.
+ // Check using ReplicaKey (id + directoryId) to handle KIP-853
properly
+ ReplicaKey localReplicaKey = ReplicaKey.of(nodeId.getAsInt(),
nodeDirectoryId);
+ hasAutoJoined =
partitionState.lastVoterSet().isVoter(localReplicaKey);
Review Comment:
```suggestion
hasAutoJoined = partitionState.lastVoterSet().isVoter(
ReplicaKey.of(nodeId.getAsInt(), nodeDirectoryId)
);
```
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2335,8 +2344,11 @@ private boolean handleAddVoterResponse(
/* These error codes indicate the replica was successfully added or
the leader is unable to
* process the request. In either case, reset the update voter set
timer to back off.
*/
- if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
- error == Errors.DUPLICATE_VOTER) {
+ if (error == Errors.NONE) {
+
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
+ hasAutoJoined = true;
Review Comment:
Take a look at this code from `AddVoterHandler#handleApiVersionsResponse`:
```
current.setLastOffset(leaderState.appendVotersRecord(newVoters,
currentTimeMs));
if (!current.ackWhenCommitted()) { ***we execute the below when
auto-join is enabled***
// complete the future to send response, but do not reset the
state,
// since the new voter set is not yet committed
current.future().complete(RaftUtil.addVoterResponse(Errors.NONE,
null));
}
```
When auto-join is enabled, we send the RPC response BEFORE the new voters
record is committed. The motivation behind this is detailed in:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1186%3A+Update+AddRaftVoterRequest+RPC+to+support+auto-join,
and is necessary for a fully available auto-join feature.
What this means here is that our local node may not actually have
auto-joined, but we set `hasAutoJoined = true`. For example, what if the remote
leader fails over before the VotersRecord adding the local node is committed?
The "more correct" place to set `hasAutoJoined = true` is in `pollFollower`
if we are in the `quorum.isVoter()` case AND `quorumConfig.autoJoin() == true`.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3352,8 +3364,13 @@ private boolean
shouldSendAddOrRemoveVoterRequest(FollowerState state, long curr
* and are configured to auto join should attempt to automatically
join the voter
* set for the configured topic partition.
*/
- return partitionState.lastKraftVersion().isReconfigSupported() &&
canBecomeVoter &&
- quorumConfig.autoJoin() &&
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+ if (!partitionState.lastKraftVersion().isReconfigSupported() ||
!canBecomeVoter ||
+ !quorumConfig.autoJoin() ||
!state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
+ return false;
+ }
+
+ // Only attempt auto-join if we haven't already auto-joined
+ return !hasAutoJoined;
Review Comment:
```suggestion
return !hasAutoJoined &&
partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
```
IMO this reads cleaner. It will also short circuit after a node successfully
auto-joins and sets `hasAutoJoined = true`, whereas this code evaluates all
boolean expressions always.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,13 @@ public void initialize(
logger.info("Reading KRaft snapshot and log as part of the
initialization");
partitionState.updateState();
logger.info("Starting voters are {}", partitionState.lastVoterSet());
+ if (nodeId.isPresent()) {
+ // if the starting voters contain the node id of this node, mark
it as already joined
Review Comment:
> if the starting voters contain the node id of this node, mark it as
already joined
This comment is inaccurate. We are checking `lastVoterSet()`, which is the
VotersRecord with the highest offset, not the "starting voters." Setting
`hasAutoJoined` properly here is where the complications of this new semantic
become apparent.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,13 @@ public void initialize(
logger.info("Reading KRaft snapshot and log as part of the
initialization");
partitionState.updateState();
logger.info("Starting voters are {}", partitionState.lastVoterSet());
+ if (nodeId.isPresent()) {
+ // if the starting voters contain the node id of this node, mark
it as already joined
+ // because it is already in the voter set.
+ // Check using ReplicaKey (id + directoryId) to handle KIP-853
properly
+ ReplicaKey localReplicaKey = ReplicaKey.of(nodeId.getAsInt(),
nodeDirectoryId);
+ hasAutoJoined =
partitionState.lastVoterSet().isVoter(localReplicaKey);
Review Comment:
I want to point out that just because the last voter set contains the local
`ReplicaKey`, that does not mean it necessarily was a result of auto-join, but
in-practice I think this is okay.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -105,7 +105,7 @@ public class QuorumConfig {
public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX
+ "auto.join.enable";
public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether
a KRaft controller should automatically " +
- "join the cluster metadata partition for its cluster id.";
+ "join the cluster metadata partition for its cluster id when the node
startup.";
Review Comment:
```suggestion
"join the cluster metadata partition for its cluster id during node
startup. This property can only be `true` when the process.roles contains
`controller`.";
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]