kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r1980466882


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1457,83 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of the following
+                * voter sets has reached the high watermark:
+                * 1. the leader's voter set at the HWM (i.e. the last 
committed voter set)
+                * 2. the leader's lastVoterSet() (which may or may not be 
committed)
+                * Note that 1 and 2 can be the same set, but when they are 
not, lastVoterSet() is uncommitted,
+                * which follows from the AtMostOneUncommittedVoterSet 
invariant.
+                *
+                * A more elaborate check is necessary for this invariant 
because this method can get called after the
+                * leader has updated its lastVoterSet() with a new uncommitted 
voter set, but before the leader has
+                * updated its high watermark using the new voter set. In this 
case, we need to check that the majority
+                * of the last committed voter set has reached the current high 
watermark.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {
+                                long nodesReachedHighWatermark = 
numReachedHighWatermark(highWatermark, otherVoterSet.voterIds());
+                                assertTrue(
+                                    nodesReachedHighWatermark >= 
cluster.majoritySize(otherVoterSet.size()),
+                                    "Insufficient nodes have reached current 
high watermark. Expected at least " +
+                                        
cluster.majoritySize(otherVoterSet.size()) + " but got " + 
nodesReachedHighWatermark);
+                            });
+                            return;
+                        }
+                        assertTrue(
+                            numReachedHighWatermark >= 
cluster.majoritySize(voterSet.size()),
+                            "Insufficient nodes have reached current high 
watermark. Expected at least " +
+                                cluster.majoritySize(voterSet.size()) + " but 
got " + numReachedHighWatermark);
+                    });
+                });
+            } else {
+                cluster.leaderHighWatermark().ifPresent(highWatermark -> {
+                    long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet());
+                    assertTrue(
+                        numReachedHighWatermark >= 
cluster.majoritySize(cluster.initialVoters.size()),
+                        "Insufficient nodes have reached current high 
watermark");
+                });
+            }
+        }
+
+        private long numReachedHighWatermark(long highWatermark, Set<Integer> 
voterIds) {
+            return cluster.persistentStates.entrySet().stream()
+                .filter(entry -> voterIds.contains(entry.getKey()))
+                .filter(entry -> entry.getValue().log.endOffset().offset() >= 
highWatermark)
+                .count();
+        }
+    }
+
+    private static class AtMostOneUncommittedVotersRecord implements Invariant 
{
+        final Cluster cluster;
+
+        private AtMostOneUncommittedVotersRecord(Cluster cluster) {
+            this.cluster = cluster;
+        }
+
+        @Override
+        public void verify() {
+            cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                leaderNode.log.readBatches(leaderNode.highWatermark(), 
OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> {
+                    Optional<LogEntry> uncommittedVotersEntry = 
Optional.empty();

Review Comment:
   Yeah probably.



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1457,83 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of the following
+                * voter sets has reached the high watermark:
+                * 1. the leader's voter set at the HWM (i.e. the last 
committed voter set)
+                * 2. the leader's lastVoterSet() (which may or may not be 
committed)
+                * Note that 1 and 2 can be the same set, but when they are 
not, lastVoterSet() is uncommitted,
+                * which follows from the AtMostOneUncommittedVoterSet 
invariant.
+                *
+                * A more elaborate check is necessary for this invariant 
because this method can get called after the
+                * leader has updated its lastVoterSet() with a new uncommitted 
voter set, but before the leader has
+                * updated its high watermark using the new voter set. In this 
case, we need to check that the majority
+                * of the last committed voter set has reached the current high 
watermark.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {
+                                long nodesReachedHighWatermark = 
numReachedHighWatermark(highWatermark, otherVoterSet.voterIds());
+                                assertTrue(
+                                    nodesReachedHighWatermark >= 
cluster.majoritySize(otherVoterSet.size()),
+                                    "Insufficient nodes have reached current 
high watermark. Expected at least " +
+                                        
cluster.majoritySize(otherVoterSet.size()) + " but got " + 
nodesReachedHighWatermark);
+                            });
+                            return;
+                        }
+                        assertTrue(
+                            numReachedHighWatermark >= 
cluster.majoritySize(voterSet.size()),
+                            "Insufficient nodes have reached current high 
watermark. Expected at least " +
+                                cluster.majoritySize(voterSet.size()) + " but 
got " + numReachedHighWatermark);
+                    });
+                });
+            } else {
+                cluster.leaderHighWatermark().ifPresent(highWatermark -> {
+                    long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet());
+                    assertTrue(
+                        numReachedHighWatermark >= 
cluster.majoritySize(cluster.initialVoters.size()),
+                        "Insufficient nodes have reached current high 
watermark");
+                });
+            }
+        }
+
+        private long numReachedHighWatermark(long highWatermark, Set<Integer> 
voterIds) {
+            return cluster.persistentStates.entrySet().stream()
+                .filter(entry -> voterIds.contains(entry.getKey()))
+                .filter(entry -> entry.getValue().log.endOffset().offset() >= 
highWatermark)
+                .count();
+        }
+    }
+
+    private static class AtMostOneUncommittedVotersRecord implements Invariant 
{
+        final Cluster cluster;
+
+        private AtMostOneUncommittedVotersRecord(Cluster cluster) {
+            this.cluster = cluster;
+        }
+
+        @Override
+        public void verify() {
+            cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                leaderNode.log.readBatches(leaderNode.highWatermark(), 
OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> {
+                    Optional<LogEntry> uncommittedVotersEntry = 
Optional.empty();

Review Comment:
   Yeah probably.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to