ahuang98 commented on code in PR #19416: URL: https://github.com/apache/kafka/pull/19416#discussion_r2047787462
########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -97,11 +105,16 @@ public FeatureControlManager build() { MetadataVersion.latestProduction().featureLevel())); quorumFeatures = new QuorumFeatures(0, localSupportedFeatures, List.of(0)); } + if (raftClient == null) { + throw new IllegalStateException("Must specify and raft client"); Review Comment: nit: "a raft client" ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -87,28 +92,54 @@ public class LeaderState<T> implements EpochState { // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; + /* Used to coordinate the upgrade of the kraft.version from 0 to 1. The upgrade is triggered by + * the clients to RaftClient. + * 1. if the kraft version is 0, the starting state is Voters. The voter set is the voters in Review Comment: thrown off a bit by the capitalization - are `Voters` and `Version` meant to reference variables in LeaderState that I'm just missing? ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -416,14 +476,192 @@ public void requestResign() { this.resignRequested = true; } + /** + * Upgrade the kraft version. + * + * This methods upgradeds the kraft version to {@code newVersion}. If the version is already Review Comment: nit: upgrades ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2435,8 +2434,18 @@ private boolean handleUpdateVoterResponse( responseMetadata.source(), currentTimeMs ); + if (handled.isPresent()) { Review Comment: question about the current logic on L2415 ``` if (responseLeaderId.isPresent() && data.currentLeader().host().isEmpty()) { ``` I'm confused why it seems we ignore the case where `responseLeaderId.isPresent() && !data.currentLeader().host().isEmpty()` - why do we set leaderEndpoints to empty in that case? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3621,7 +3646,35 @@ public long logEndOffset() { @Override public KRaftVersion kraftVersion() { - return partitionState.lastKraftVersion(); + if (!isInitialized()) { + throw new IllegalStateException("Cannot read the kraft version before the replica has been initialized"); + } + + return quorum + .maybeLeaderState() + .flatMap(LeaderState::requestedKRaftVersion) Review Comment: so it might be possible we report a version that we're never able to finish upgrading to? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2435,8 +2434,18 @@ private boolean handleUpdateVoterResponse( responseMetadata.source(), currentTimeMs ); + if (handled.isPresent()) { + return handled.get(); + } else if (error == Errors.NONE || error == Errors.UNSUPPORTED_VERSION) { + FollowerState follower = quorum.followerStateOrThrow(); Review Comment: we're going to throw if the local replica happens to no longer be in follower state? vs just ignore the response? ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -416,14 +476,192 @@ public void requestResign() { this.resignRequested = true; } + /** + * Upgrade the kraft version. + * + * This methods upgradeds the kraft version to {@code newVersion}. If the version is already + * {@code newVersion}, this is a noop operation. + * + * KRaft only supports upgrades, so {@code newVersion} must be greater than or equal to curent + * kraft version {@code persistedVersion}. + * + * For the upgrade to succeed all of the voters in the voter set must support the new kraft + * version. The upgrade from kraft version 0 to kraft version 1 generate one control batch + * with one control record setting the kraft version to 1 and one voters record setting the + * updated voter set. + * + * When {@code validateOnly} is true only the validation is perform and the control records are + * not generated. + * + * @param epoch the current epoch Review Comment: so this is the epoch that the controller was on when the upgrade kraft version request was created? since `validateEpoch` refers to `epoch()` as the current epoch, maybe we should change the wording of the description here ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -416,14 +476,192 @@ public void requestResign() { this.resignRequested = true; } + /** + * Upgrade the kraft version. + * + * This methods upgradeds the kraft version to {@code newVersion}. If the version is already + * {@code newVersion}, this is a noop operation. + * + * KRaft only supports upgrades, so {@code newVersion} must be greater than or equal to curent + * kraft version {@code persistedVersion}. + * + * For the upgrade to succeed all of the voters in the voter set must support the new kraft + * version. The upgrade from kraft version 0 to kraft version 1 generate one control batch + * with one control record setting the kraft version to 1 and one voters record setting the + * updated voter set. + * + * When {@code validateOnly} is true only the validation is perform and the control records are + * not generated. + * + * @param epoch the current epoch + * @param newVersion the new kraft version + * @param persistedVersion the kraft version persisted to disk + * @param persistedVoters the set of voters persisted to disk + * @param validateOnly determine if only validation should be performed + * @param currentTimeMs the current time + */ + public boolean maybeAppendUpgradedKRaftVersion( + int epoch, + KRaftVersion newVersion, + KRaftVersion persistedVersion, + VoterSet persistedVoters, + boolean validateOnly, + long currentTimeMs + ) { + validateEpoch(epoch); + + var pendingVersion = kraftVersionUpgradeState.get().toVersion(); + if (pendingVersion.isPresent()) { + if (pendingVersion.get().kraftVersion().equals(newVersion)) { + // The version match; upgrade is a noop + return false; + } else { + throw new InvalidUpdateVersionException( + String.format( + "Invalid concurrent upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + pendingVersion.get(), + newVersion + ) + ); + } + } else if (persistedVersion.equals(newVersion)) { + return false; + } else if (persistedVersion.isMoreThan(newVersion)) { + throw new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s because the new version is a downgrade", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ); + } + + // Upgrade to kraft.verion 1 is only supported; this needs to change when kraft.version 2 is added + var inMemoryVoters = kraftVersionUpgradeState.get().toVoters().orElseThrow(() -> + new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ) + ); + if (!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) { + throw new IllegalStateException( + String.format( + "Unable to update %s due to missing voters %s compared to %s", Review Comment: "Unable to upgrade version for %s to %s because only voters %s have been updated out of %s"? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -3192,13 +3223,36 @@ private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) { transitionToProspective(currentTimeMs); backoffMs = 0; } else if (state.hasUpdateVoterPeriodExpired(currentTimeMs)) { + final boolean resetUpdateVoterTimer; if (partitionState.lastKraftVersion().isReconfigSupported() && - partitionState.lastVoterSet().voterNodeNeedsUpdate(quorum.localVoterNodeOrThrow())) { - backoffMs = maybeSendUpdateVoterRequest(state, currentTimeMs); + partitionState.lastVoterSet().voterNodeNeedsUpdate(quorum.localVoterNodeOrThrow()) + ) { + // When the cluster supports reconfiguration, send an updated voter configuration + // if the one in the log doesn't match the local configuration. + var sendResult = maybeSendUpdateVoterRequest(state, currentTimeMs); + // Update the request timer if the request was sent + resetUpdateVoterTimer = sendResult.first(); + backoffMs = sendResult.second(); + } else if (!partitionState.lastKraftVersion().isReconfigSupported() && + !state.hasUpdatedLeader() + ) { + // When the cluster doesn't support reconfiguration, the voter needs to send its + // voter information to every new leader. This is because leaders don't persist voter + // information when reconfiguration has not been enabled. The updated voter information + // is required to be able to upgrade the cluster from kraft.version 0. + var sendResult = maybeSendUpdateVoterRequest(state, currentTimeMs); + // Update the request timer if the request was sent + resetUpdateVoterTimer = sendResult.first(); + backoffMs = sendResult.second(); Review Comment: would it work to simplify these two clauses to the following? ``` if ((partitionState.lastKraftVersion().isReconfigSupported() && partitionState.lastVoterSet().voterNodeNeedsUpdate(quorum.localVoterNodeOrThrow()) || !state.hasUpdatedLeader() ) { var sendResult = maybeSendUpdateVoterRequest(state, currentTimeMs); resetUpdateVoterTimer = sendResult.first(); backoffMs = sendResult.second(); } ``` ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -87,28 +92,54 @@ public class LeaderState<T> implements EpochState { // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; + /* Used to coordinate the upgrade of the kraft.version from 0 to 1. The upgrade is triggered by + * the clients to RaftClient. + * 1. if the kraft version is 0, the starting state is Voters. The voter set is the voters in Review Comment: I see, they are implementations of `KRaftVersionUpgrade`... I wonder if there is a way to make it more clear what we are referring to since the words "Voters" and "Version" are pretty generic maybe just - `if the kraft version is 0, the starting state is Voters (see KRaftVersionUpgrade for details)` ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2746,17 +2755,27 @@ private void handleInboundMessage(RaftMessage message, long currentTimeMs) { } /** - * Attempt to send a request. Return the time to wait before the request can be retried. + * Attempt to send a request. + * + * Return if the request was send and the time to wait before the request can be retried. Review Comment: nit: typo `sent` ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -416,14 +476,192 @@ public void requestResign() { this.resignRequested = true; } + /** + * Upgrade the kraft version. + * + * This methods upgradeds the kraft version to {@code newVersion}. If the version is already + * {@code newVersion}, this is a noop operation. + * + * KRaft only supports upgrades, so {@code newVersion} must be greater than or equal to curent + * kraft version {@code persistedVersion}. + * + * For the upgrade to succeed all of the voters in the voter set must support the new kraft + * version. The upgrade from kraft version 0 to kraft version 1 generate one control batch + * with one control record setting the kraft version to 1 and one voters record setting the + * updated voter set. + * + * When {@code validateOnly} is true only the validation is perform and the control records are + * not generated. + * + * @param epoch the current epoch + * @param newVersion the new kraft version + * @param persistedVersion the kraft version persisted to disk + * @param persistedVoters the set of voters persisted to disk + * @param validateOnly determine if only validation should be performed + * @param currentTimeMs the current time + */ + public boolean maybeAppendUpgradedKRaftVersion( + int epoch, + KRaftVersion newVersion, + KRaftVersion persistedVersion, + VoterSet persistedVoters, + boolean validateOnly, + long currentTimeMs + ) { + validateEpoch(epoch); + + var pendingVersion = kraftVersionUpgradeState.get().toVersion(); + if (pendingVersion.isPresent()) { + if (pendingVersion.get().kraftVersion().equals(newVersion)) { + // The version match; upgrade is a noop + return false; + } else { + throw new InvalidUpdateVersionException( + String.format( + "Invalid concurrent upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + pendingVersion.get(), + newVersion + ) + ); + } + } else if (persistedVersion.equals(newVersion)) { + return false; + } else if (persistedVersion.isMoreThan(newVersion)) { + throw new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s because the new version is a downgrade", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ); + } + + // Upgrade to kraft.verion 1 is only supported; this needs to change when kraft.version 2 is added + var inMemoryVoters = kraftVersionUpgradeState.get().toVoters().orElseThrow(() -> + new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ) + ); + if (!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) { + throw new IllegalStateException( + String.format( + "Unable to update %s due to missing voters %s compared to %s", + KRaftVersion.FEATURE_NAME, + inMemoryVoters.voters().voterIds(), + persistedVoters.voterIds() + ) + ); + } else if (!inMemoryVoters.voters().supportsVersion(newVersion)) { + log.info("Not all voters support kraft version {}: {}", newVersion, inMemoryVoters.voters()); + throw new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s to %s because not all of the voters support it", + KRaftVersion.FEATURE_NAME, + newVersion + ) + ); + } else if ( + inMemoryVoters + .voters() + .voterKeys() + .stream() + .anyMatch(voterKey -> voterKey.directoryId().isEmpty()) + ) { + throw new IllegalStateException( + String.format( + "Directory id must be known for all of the voters: %s", + inMemoryVoters.voters() + ) + ); + } + + if (!validateOnly) { + /* Note that this only supports upgrades from kraft.version 0 to kraft.version 1. When + * kraft.version 2 is added, this logic needs to be revisited + */ + var successful = kraftVersionUpgradeState.compareAndSet( + inMemoryVoters, + new KRaftVersionUpgrade.Version(newVersion) + ); + if (!successful) { + throw new InvalidUpdateVersionException( + String.format( + "Unable to upgrade version for %s to %s due to changing voters", Review Comment: `Unable to upgrade version for %s to %s due to a change in the in memory KRaftVersionUpgrade voter state`? ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -87,28 +92,54 @@ public class LeaderState<T> implements EpochState { // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; + /* Used to coordinate the upgrade of the kraft.version from 0 to 1. The upgrade is triggered by + * the clients to RaftClient. + * 1. if the kraft version is 0, the starting state is Voters. The voter set is the voters in + * the static voter set with the leader updated. + * 2. as the leader receives UpdateRaftVoter requests, it updates the associated Voters. Only + * after all of the voters have been updated will upgrades successfully complete. Review Comment: nit: `will an upgrade successfully complete.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org