ahuang98 commented on code in PR #19416: URL: https://github.com/apache/kafka/pull/19416#discussion_r2051230280
########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -416,14 +476,192 @@ public void requestResign() { this.resignRequested = true; } + /** + * Upgrade the kraft version. + * + * This methods upgradeds the kraft version to {@code newVersion}. If the version is already + * {@code newVersion}, this is a noop operation. + * + * KRaft only supports upgrades, so {@code newVersion} must be greater than or equal to curent + * kraft version {@code persistedVersion}. + * + * For the upgrade to succeed all of the voters in the voter set must support the new kraft + * version. The upgrade from kraft version 0 to kraft version 1 generate one control batch + * with one control record setting the kraft version to 1 and one voters record setting the + * updated voter set. + * + * When {@code validateOnly} is true only the validation is perform and the control records are + * not generated. + * + * @param epoch the current epoch + * @param newVersion the new kraft version + * @param persistedVersion the kraft version persisted to disk + * @param persistedVoters the set of voters persisted to disk + * @param validateOnly determine if only validation should be performed + * @param currentTimeMs the current time + */ + public boolean maybeAppendUpgradedKRaftVersion( + int epoch, + KRaftVersion newVersion, + KRaftVersion persistedVersion, + VoterSet persistedVoters, + boolean validateOnly, + long currentTimeMs + ) { + validateEpoch(epoch); + + var pendingVersion = kraftVersionUpgradeState.get().toVersion(); + if (pendingVersion.isPresent()) { + if (pendingVersion.get().kraftVersion().equals(newVersion)) { + // The version match; upgrade is a noop + return false; + } else { + throw new InvalidUpdateVersionException( + String.format( + "Invalid concurrent upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + pendingVersion.get(), + newVersion + ) + ); + } + } else if (persistedVersion.equals(newVersion)) { + return false; + } else if (persistedVersion.isMoreThan(newVersion)) { + throw new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s because the new version is a downgrade", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ); + } + + // Upgrade to kraft.verion 1 is only supported; this needs to change when kraft.version 2 is added + var inMemoryVoters = kraftVersionUpgradeState.get().toVoters().orElseThrow(() -> + new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s from version %s to %s", + KRaftVersion.FEATURE_NAME, + persistedVersion, + newVersion + ) + ) + ); + if (!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) { + throw new IllegalStateException( + String.format( + "Unable to update %s due to missing voters %s compared to %s", + KRaftVersion.FEATURE_NAME, + inMemoryVoters.voters().voterIds(), + persistedVoters.voterIds() + ) + ); + } else if (!inMemoryVoters.voters().supportsVersion(newVersion)) { + log.info("Not all voters support kraft version {}: {}", newVersion, inMemoryVoters.voters()); + throw new InvalidUpdateVersionException( + String.format( + "Invalid upgrade of %s to %s because not all of the voters support it", + KRaftVersion.FEATURE_NAME, + newVersion + ) + ); + } else if ( + inMemoryVoters + .voters() + .voterKeys() + .stream() + .anyMatch(voterKey -> voterKey.directoryId().isEmpty()) + ) { + throw new IllegalStateException( + String.format( + "Directory id must be known for all of the voters: %s", + inMemoryVoters.voters() + ) + ); + } + + if (!validateOnly) { + /* Note that this only supports upgrades from kraft.version 0 to kraft.version 1. When + * kraft.version 2 is added, this logic needs to be revisited + */ + var successful = kraftVersionUpgradeState.compareAndSet( + inMemoryVoters, + new KRaftVersionUpgrade.Version(newVersion) + ); + if (!successful) { + throw new InvalidUpdateVersionException( + String.format( + "Unable to upgrade version for %s to %s due to changing voters", Review Comment: I thought "changing voters" might be a bit ambiguous as well, but can't think of any better wording -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org