jsancio commented on code in PR #19416:
URL: https://github.com/apache/kafka/pull/19416#discussion_r2050852840


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -416,14 +476,192 @@ public void requestResign() {
         this.resignRequested = true;
     }
 
+    /**
+     * Upgrade the kraft version.
+     *
+     * This methods upgradeds the kraft version to {@code newVersion}. If the 
version is already
+     * {@code newVersion}, this is a noop operation.
+     *
+     * KRaft only supports upgrades, so {@code newVersion} must be greater 
than or equal to curent
+     * kraft version {@code persistedVersion}.
+     *
+     * For the upgrade to succeed all of the voters in the voter set must 
support the new kraft
+     * version. The upgrade from kraft version 0 to kraft version 1 generate 
one control batch
+     * with one control record setting the kraft version to 1 and one voters 
record setting the
+     * updated voter set.
+     *
+     * When {@code validateOnly} is true only the validation is perform and 
the control records are
+     * not generated.
+     *
+     * @param epoch the current epoch
+     * @param newVersion the new kraft version
+     * @param persistedVersion the kraft version persisted to disk
+     * @param persistedVoters the set of voters persisted to disk
+     * @param validateOnly determine if only validation should be performed
+     * @param currentTimeMs the current time
+     */
+    public boolean maybeAppendUpgradedKRaftVersion(
+        int epoch,
+        KRaftVersion newVersion,
+        KRaftVersion persistedVersion,
+        VoterSet persistedVoters,
+        boolean validateOnly,
+        long currentTimeMs
+    ) {
+        validateEpoch(epoch);
+
+        var pendingVersion = kraftVersionUpgradeState.get().toVersion();
+        if (pendingVersion.isPresent()) {
+            if (pendingVersion.get().kraftVersion().equals(newVersion)) {
+                // The version match; upgrade is a noop
+                return false;
+            } else {
+                throw new InvalidUpdateVersionException(
+                    String.format(
+                        "Invalid concurrent upgrade of %s from version %s to 
%s",
+                        KRaftVersion.FEATURE_NAME,
+                        pendingVersion.get(),
+                        newVersion
+                    )
+                );
+            }
+        } else if (persistedVersion.equals(newVersion)) {
+            return false;
+        } else if (persistedVersion.isMoreThan(newVersion)) {
+            throw new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s from version %s to %s because the 
new version is a downgrade",
+                    KRaftVersion.FEATURE_NAME,
+                    persistedVersion,
+                    newVersion
+                )
+            );
+        }
+
+        // Upgrade to kraft.verion 1 is only supported; this needs to change 
when kraft.version 2 is added
+        var inMemoryVoters = 
kraftVersionUpgradeState.get().toVoters().orElseThrow(() ->
+            new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s from version %s to %s",
+                    KRaftVersion.FEATURE_NAME,
+                    persistedVersion,
+                    newVersion
+                )
+            )
+        );
+        if 
(!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) {
+            throw new IllegalStateException(
+                String.format(
+                    "Unable to update %s due to missing voters %s compared to 
%s",
+                    KRaftVersion.FEATURE_NAME,
+                    inMemoryVoters.voters().voterIds(),
+                    persistedVoters.voterIds()
+                )
+            );
+        } else if (!inMemoryVoters.voters().supportsVersion(newVersion)) {
+            log.info("Not all voters support kraft version {}: {}", 
newVersion, inMemoryVoters.voters());
+            throw new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s to %s because not all of the voters 
support it",
+                    KRaftVersion.FEATURE_NAME,
+                    newVersion
+                )
+            );
+        } else if (
+            inMemoryVoters
+                .voters()
+                .voterKeys()
+                .stream()
+                .anyMatch(voterKey -> voterKey.directoryId().isEmpty())
+        ) {
+            throw new IllegalStateException(
+                String.format(
+                    "Directory id must be known for all of the voters: %s",
+                    inMemoryVoters.voters()
+                )
+            );
+        }
+
+        if (!validateOnly) {
+            /* Note that this only supports upgrades from kraft.version 0 to 
kraft.version 1. When
+             * kraft.version 2 is added, this logic needs to be revisited
+             */
+            var successful = kraftVersionUpgradeState.compareAndSet(
+                inMemoryVoters,
+                new KRaftVersionUpgrade.Version(newVersion)
+            );
+            if (!successful) {
+                throw new InvalidUpdateVersionException(
+                    String.format(
+                        "Unable to upgrade version for %s to %s due to 
changing voters",

Review Comment:
   This message is returned to the user. I don't think the user is not going to 
know what "in memory KRaftVersionUpgrade" means.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to