jsancio commented on code in PR #19416: URL: https://github.com/apache/kafka/pull/19416#discussion_r2050851544
########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -416,14 +476,192 @@ public void requestResign() { this.resignRequested = true; } + /** + * Upgrade the kraft version. + * + * This methods upgradeds the kraft version to {@code newVersion}. If the version is already + * {@code newVersion}, this is a noop operation. + * + * KRaft only supports upgrades, so {@code newVersion} must be greater than or equal to curent + * kraft version {@code persistedVersion}. + * + * For the upgrade to succeed all of the voters in the voter set must support the new kraft + * version. The upgrade from kraft version 0 to kraft version 1 generate one control batch + * with one control record setting the kraft version to 1 and one voters record setting the + * updated voter set. + * + * When {@code validateOnly} is true only the validation is perform and the control records are + * not generated. + * + * @param epoch the current epoch + * @param newVersion the new kraft version + * @param persistedVersion the kraft version persisted to disk + * @param persistedVoters the set of voters persisted to disk + * @param validateOnly determine if only validation should be performed + * @param currentTimeMs the current time + */ + public boolean maybeAppendUpgradedKRaftVersion( + int epoch, + KRaftVersion newVersion, + KRaftVersion persistedVersion, + VoterSet persistedVoters, + boolean validateOnly, + long currentTimeMs + ) { + validateEpoch(epoch); + + var pendingVersion = kraftVersionUpgradeState.get().toVersion(); + if (pendingVersion.isPresent()) { + if (pendingVersion.get().kraftVersion().equals(newVersion)) { + // The version match; upgrade is a noop + return false; + } else { + throw new InvalidUpdateVersionException( + String.format( + "Invalid concurrent upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + pendingVersion.get(), + newVersion + ) + ); + } + } else if (persistedVersion.equals(newVersion)) { + return false; + } else if (persistedVersion.isMoreThan(newVersion)) { + throw new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s because the new version is a downgrade", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ); + } + + // Upgrade to kraft.verion 1 is only supported; this needs to change when kraft.version 2 is added + var inMemoryVoters = kraftVersionUpgradeState.get().toVoters().orElseThrow(() -> + new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ) + ); + if (!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) { + throw new IllegalStateException( + String.format( + "Unable to update %s due to missing voters %s compared to %s", Review Comment: I changed the message to include the new version. Didn't change the rest of the phrase because it wasn't 100% accurate. This check is a programming error if it fails, hence the IllegalStateException. It is expected that the voter ids in-memory is always equals to the static voter ids. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org