ahuang98 commented on code in PR #19416:
URL: https://github.com/apache/kafka/pull/19416#discussion_r2047787462


##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -97,11 +105,16 @@ public FeatureControlManager build() {
                         MetadataVersion.latestProduction().featureLevel()));
                 quorumFeatures = new QuorumFeatures(0, localSupportedFeatures, 
List.of(0));
             }
+            if (raftClient == null) {
+                throw new IllegalStateException("Must specify and raft 
client");

Review Comment:
   nit: "a raft client"



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -87,28 +92,54 @@ public class LeaderState<T> implements EpochState {
     // This is volatile because resignation can be requested from an external 
thread.
     private volatile boolean resignRequested = false;
 
+    /* Used to coordinate the upgrade of the kraft.version from 0 to 1. The 
upgrade is triggered by
+     * the clients to RaftClient.
+     *  1. if the kraft version is 0, the starting state is Voters. The voter 
set is the voters in

Review Comment:
   thrown off a bit by the capitalization - are `Voters` and `Version` meant to 
reference variables in LeaderState that I'm just missing?



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -416,14 +476,192 @@ public void requestResign() {
         this.resignRequested = true;
     }
 
+    /**
+     * Upgrade the kraft version.
+     *
+     * This methods upgradeds the kraft version to {@code newVersion}. If the 
version is already

Review Comment:
   nit: upgrades



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2435,8 +2434,18 @@ private boolean handleUpdateVoterResponse(
             responseMetadata.source(),
             currentTimeMs
         );
+        if (handled.isPresent()) {

Review Comment:
   question about the current logic on L2415
   ```
   if (responseLeaderId.isPresent() && data.currentLeader().host().isEmpty()) {
   ```
   I'm confused why it seems we ignore the case where 
`responseLeaderId.isPresent() && !data.currentLeader().host().isEmpty()` - why 
do we set leaderEndpoints to empty in that case?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3621,7 +3646,35 @@ public long logEndOffset() {
 
     @Override
     public KRaftVersion kraftVersion() {
-        return partitionState.lastKraftVersion();
+        if (!isInitialized()) {
+            throw new IllegalStateException("Cannot read the kraft version 
before the replica has been initialized");
+        }
+
+        return quorum
+            .maybeLeaderState()
+            .flatMap(LeaderState::requestedKRaftVersion)

Review Comment:
   so it might be possible we report a version that we're never able to finish 
upgrading to?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2435,8 +2434,18 @@ private boolean handleUpdateVoterResponse(
             responseMetadata.source(),
             currentTimeMs
         );
+        if (handled.isPresent()) {
+            return handled.get();
+        } else if (error == Errors.NONE || error == 
Errors.UNSUPPORTED_VERSION) {
+            FollowerState follower = quorum.followerStateOrThrow();

Review Comment:
   we're going to throw if the local replica happens to no longer be in 
follower state? vs just ignore the response?



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -416,14 +476,192 @@ public void requestResign() {
         this.resignRequested = true;
     }
 
+    /**
+     * Upgrade the kraft version.
+     *
+     * This methods upgradeds the kraft version to {@code newVersion}. If the 
version is already
+     * {@code newVersion}, this is a noop operation.
+     *
+     * KRaft only supports upgrades, so {@code newVersion} must be greater 
than or equal to curent
+     * kraft version {@code persistedVersion}.
+     *
+     * For the upgrade to succeed all of the voters in the voter set must 
support the new kraft
+     * version. The upgrade from kraft version 0 to kraft version 1 generate 
one control batch
+     * with one control record setting the kraft version to 1 and one voters 
record setting the
+     * updated voter set.
+     *
+     * When {@code validateOnly} is true only the validation is perform and 
the control records are
+     * not generated.
+     *
+     * @param epoch the current epoch

Review Comment:
   so this is the epoch that the controller was on when the upgrade kraft 
version request was created?
   
   since `validateEpoch` refers to `epoch()` as the current epoch, maybe we 
should change the wording of the description here



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -416,14 +476,192 @@ public void requestResign() {
         this.resignRequested = true;
     }
 
+    /**
+     * Upgrade the kraft version.
+     *
+     * This methods upgradeds the kraft version to {@code newVersion}. If the 
version is already
+     * {@code newVersion}, this is a noop operation.
+     *
+     * KRaft only supports upgrades, so {@code newVersion} must be greater 
than or equal to curent
+     * kraft version {@code persistedVersion}.
+     *
+     * For the upgrade to succeed all of the voters in the voter set must 
support the new kraft
+     * version. The upgrade from kraft version 0 to kraft version 1 generate 
one control batch
+     * with one control record setting the kraft version to 1 and one voters 
record setting the
+     * updated voter set.
+     *
+     * When {@code validateOnly} is true only the validation is perform and 
the control records are
+     * not generated.
+     *
+     * @param epoch the current epoch
+     * @param newVersion the new kraft version
+     * @param persistedVersion the kraft version persisted to disk
+     * @param persistedVoters the set of voters persisted to disk
+     * @param validateOnly determine if only validation should be performed
+     * @param currentTimeMs the current time
+     */
+    public boolean maybeAppendUpgradedKRaftVersion(
+        int epoch,
+        KRaftVersion newVersion,
+        KRaftVersion persistedVersion,
+        VoterSet persistedVoters,
+        boolean validateOnly,
+        long currentTimeMs
+    ) {
+        validateEpoch(epoch);
+
+        var pendingVersion = kraftVersionUpgradeState.get().toVersion();
+        if (pendingVersion.isPresent()) {
+            if (pendingVersion.get().kraftVersion().equals(newVersion)) {
+                // The version match; upgrade is a noop
+                return false;
+            } else {
+                throw new InvalidUpdateVersionException(
+                    String.format(
+                        "Invalid concurrent upgrade of %s from version %s to 
%s",
+                        KRaftVersion.FEATURE_NAME,
+                        pendingVersion.get(),
+                        newVersion
+                    )
+                );
+            }
+        } else if (persistedVersion.equals(newVersion)) {
+            return false;
+        } else if (persistedVersion.isMoreThan(newVersion)) {
+            throw new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s from version %s to %s because the 
new version is a downgrade",
+                    KRaftVersion.FEATURE_NAME,
+                    persistedVersion,
+                    newVersion
+                )
+            );
+        }
+
+        // Upgrade to kraft.verion 1 is only supported; this needs to change 
when kraft.version 2 is added
+        var inMemoryVoters = 
kraftVersionUpgradeState.get().toVoters().orElseThrow(() ->
+            new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s from version %s to %s",
+                    KRaftVersion.FEATURE_NAME,
+                    persistedVersion,
+                    newVersion
+                )
+            )
+        );
+        if 
(!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) {
+            throw new IllegalStateException(
+                String.format(
+                    "Unable to update %s due to missing voters %s compared to 
%s",

Review Comment:
   "Unable to upgrade version for %s to %s because only voters %s have been 
updated out of %s"?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3192,13 +3223,36 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
             transitionToProspective(currentTimeMs);
             backoffMs = 0;
         } else if (state.hasUpdateVoterPeriodExpired(currentTimeMs)) {
+            final boolean resetUpdateVoterTimer;
             if (partitionState.lastKraftVersion().isReconfigSupported() &&
-                
partitionState.lastVoterSet().voterNodeNeedsUpdate(quorum.localVoterNodeOrThrow()))
 {
-                backoffMs = maybeSendUpdateVoterRequest(state, currentTimeMs);
+                
partitionState.lastVoterSet().voterNodeNeedsUpdate(quorum.localVoterNodeOrThrow())
+            ) {
+                // When the cluster supports reconfiguration, send an updated 
voter configuration
+                // if the one in the log doesn't match the local configuration.
+                var sendResult = maybeSendUpdateVoterRequest(state, 
currentTimeMs);
+                // Update the request timer if the request was sent
+                resetUpdateVoterTimer = sendResult.first();
+                backoffMs = sendResult.second();
+            } else if 
(!partitionState.lastKraftVersion().isReconfigSupported() &&
+                !state.hasUpdatedLeader()
+            ) {
+                // When the cluster doesn't support reconfiguration, the voter 
needs to send its
+                // voter information to every new leader. This is because 
leaders don't persist voter
+                // information when reconfiguration has not been enabled. The 
updated voter information
+                // is required to be able to upgrade the cluster from 
kraft.version 0.
+                var sendResult = maybeSendUpdateVoterRequest(state, 
currentTimeMs);
+                // Update the request timer if the request was sent
+                resetUpdateVoterTimer = sendResult.first();
+                backoffMs = sendResult.second();

Review Comment:
   would it work to simplify these two clauses to the following?
   ```
   if ((partitionState.lastKraftVersion().isReconfigSupported() && 
partitionState.lastVoterSet().voterNodeNeedsUpdate(quorum.localVoterNodeOrThrow())
 ||
       !state.hasUpdatedLeader()
   ) {
       var sendResult = maybeSendUpdateVoterRequest(state, currentTimeMs);
       resetUpdateVoterTimer = sendResult.first();
       backoffMs = sendResult.second();
   }
   ```



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -87,28 +92,54 @@ public class LeaderState<T> implements EpochState {
     // This is volatile because resignation can be requested from an external 
thread.
     private volatile boolean resignRequested = false;
 
+    /* Used to coordinate the upgrade of the kraft.version from 0 to 1. The 
upgrade is triggered by
+     * the clients to RaftClient.
+     *  1. if the kraft version is 0, the starting state is Voters. The voter 
set is the voters in

Review Comment:
   I see, they are implementations of `KRaftVersionUpgrade`... I wonder if 
there is a way to make it more clear what we are referring to since the words 
"Voters" and "Version" are pretty generic 
   
   maybe just - `if the kraft version is 0, the starting state is Voters (see 
KRaftVersionUpgrade for details)`



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2746,17 +2755,27 @@ private void handleInboundMessage(RaftMessage message, 
long currentTimeMs) {
     }
 
     /**
-     * Attempt to send a request. Return the time to wait before the request 
can be retried.
+     * Attempt to send a request.
+     *
+     * Return if the request was send and the time to wait before the request 
can be retried.

Review Comment:
   nit: typo `sent`



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -416,14 +476,192 @@ public void requestResign() {
         this.resignRequested = true;
     }
 
+    /**
+     * Upgrade the kraft version.
+     *
+     * This methods upgradeds the kraft version to {@code newVersion}. If the 
version is already
+     * {@code newVersion}, this is a noop operation.
+     *
+     * KRaft only supports upgrades, so {@code newVersion} must be greater 
than or equal to curent
+     * kraft version {@code persistedVersion}.
+     *
+     * For the upgrade to succeed all of the voters in the voter set must 
support the new kraft
+     * version. The upgrade from kraft version 0 to kraft version 1 generate 
one control batch
+     * with one control record setting the kraft version to 1 and one voters 
record setting the
+     * updated voter set.
+     *
+     * When {@code validateOnly} is true only the validation is perform and 
the control records are
+     * not generated.
+     *
+     * @param epoch the current epoch
+     * @param newVersion the new kraft version
+     * @param persistedVersion the kraft version persisted to disk
+     * @param persistedVoters the set of voters persisted to disk
+     * @param validateOnly determine if only validation should be performed
+     * @param currentTimeMs the current time
+     */
+    public boolean maybeAppendUpgradedKRaftVersion(
+        int epoch,
+        KRaftVersion newVersion,
+        KRaftVersion persistedVersion,
+        VoterSet persistedVoters,
+        boolean validateOnly,
+        long currentTimeMs
+    ) {
+        validateEpoch(epoch);
+
+        var pendingVersion = kraftVersionUpgradeState.get().toVersion();
+        if (pendingVersion.isPresent()) {
+            if (pendingVersion.get().kraftVersion().equals(newVersion)) {
+                // The version match; upgrade is a noop
+                return false;
+            } else {
+                throw new InvalidUpdateVersionException(
+                    String.format(
+                        "Invalid concurrent upgrade of %s from version %s to 
%s",
+                        KRaftVersion.FEATURE_NAME,
+                        pendingVersion.get(),
+                        newVersion
+                    )
+                );
+            }
+        } else if (persistedVersion.equals(newVersion)) {
+            return false;
+        } else if (persistedVersion.isMoreThan(newVersion)) {
+            throw new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s from version %s to %s because the 
new version is a downgrade",
+                    KRaftVersion.FEATURE_NAME,
+                    persistedVersion,
+                    newVersion
+                )
+            );
+        }
+
+        // Upgrade to kraft.verion 1 is only supported; this needs to change 
when kraft.version 2 is added
+        var inMemoryVoters = 
kraftVersionUpgradeState.get().toVoters().orElseThrow(() ->
+            new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s from version %s to %s",
+                    KRaftVersion.FEATURE_NAME,
+                    persistedVersion,
+                    newVersion
+                )
+            )
+        );
+        if 
(!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) {
+            throw new IllegalStateException(
+                String.format(
+                    "Unable to update %s due to missing voters %s compared to 
%s",
+                    KRaftVersion.FEATURE_NAME,
+                    inMemoryVoters.voters().voterIds(),
+                    persistedVoters.voterIds()
+                )
+            );
+        } else if (!inMemoryVoters.voters().supportsVersion(newVersion)) {
+            log.info("Not all voters support kraft version {}: {}", 
newVersion, inMemoryVoters.voters());
+            throw new InvalidUpdateVersionException(
+                String.format(
+                    "Invalid upgrade of %s to %s because not all of the voters 
support it",
+                    KRaftVersion.FEATURE_NAME,
+                    newVersion
+                )
+            );
+        } else if (
+            inMemoryVoters
+                .voters()
+                .voterKeys()
+                .stream()
+                .anyMatch(voterKey -> voterKey.directoryId().isEmpty())
+        ) {
+            throw new IllegalStateException(
+                String.format(
+                    "Directory id must be known for all of the voters: %s",
+                    inMemoryVoters.voters()
+                )
+            );
+        }
+
+        if (!validateOnly) {
+            /* Note that this only supports upgrades from kraft.version 0 to 
kraft.version 1. When
+             * kraft.version 2 is added, this logic needs to be revisited
+             */
+            var successful = kraftVersionUpgradeState.compareAndSet(
+                inMemoryVoters,
+                new KRaftVersionUpgrade.Version(newVersion)
+            );
+            if (!successful) {
+                throw new InvalidUpdateVersionException(
+                    String.format(
+                        "Unable to upgrade version for %s to %s due to 
changing voters",

Review Comment:
   `Unable to upgrade version for %s to %s due to a change in the in memory 
KRaftVersionUpgrade voter state`?



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -87,28 +92,54 @@ public class LeaderState<T> implements EpochState {
     // This is volatile because resignation can be requested from an external 
thread.
     private volatile boolean resignRequested = false;
 
+    /* Used to coordinate the upgrade of the kraft.version from 0 to 1. The 
upgrade is triggered by
+     * the clients to RaftClient.
+     *  1. if the kraft version is 0, the starting state is Voters. The voter 
set is the voters in
+     *     the static voter set with the leader updated.
+     *  2. as the leader receives UpdateRaftVoter requests, it updates the 
associated Voters. Only
+     *     after all of the voters have been updated will upgrades 
successfully complete.

Review Comment:
   nit: `will an upgrade successfully complete.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to