ahuang98 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r1980430607


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable(
         scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddVotersIfMajorityIsReachable(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 3) int numObservers
+    ) {
+        int numVoters = 5;
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition the nodes into two sets. Nodes are reachable within each 
set,
+        // but the two sets cannot communicate with each other. We should be 
able
+        // to make progress even if an election is needed in the larger set.
+        int firstObserverId = numVoters;
+        router.filter(
+            0,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(
+            1,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(2, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(3, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(4, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(firstObserverId, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+
+        long partitionLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * 
partitionLogEndOffset));
+
+        long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 
1));
+        long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 
3, 4));
+
+        assertTrue(
+            majorityHighWatermark > minorityHighWatermark,
+            String.format(
+                "majorityHighWatermark = %s, minorityHighWatermark = %s",
+                majorityHighWatermark,
+                minorityHighWatermark
+            )
+        );
+
+        // Verify we can add a voter, since a majority is still reachable
+        addVoter(cluster, scheduler, firstObserverId, expectedVoterIds);

Review Comment:
   should expectedVoterIds include the observer you're trying to add?



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -117,40 +126,58 @@ public class RaftEventSimulationTest {
     void canElectInitialLeader(
         @ForAll int seed,
         @ForAll @IntRange(min = 1, max = 5) int numVoters,
-        @ForAll @IntRange(min = 0, max = 5) int numObservers
+        @ForAll @IntRange(min = 0, max = 5) int numObservers,
+        @ForAll boolean withKip853
     ) {
         Random random = new Random(seed);
-        Cluster cluster = new Cluster(numVoters, numObservers, random);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, 
withKip853);
         MessageRouter router = new MessageRouter(cluster);
         EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
 
-        cluster.startAll();
-        schedulePolling(scheduler, cluster, 3, 5);
-        scheduler.schedule(router::deliverAll, 0, 2, 1);
-        scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
-        scheduler.runUntil(cluster::hasConsistentLeader);
-        scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
+        initializeClusterAndStartAppending(cluster, router, scheduler, 2);
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddAndRemoveVoters(
+        @ForAll int seed
+    ) {
+        Random random = new Random(seed);
+        int numVoters = 3;
+        int numObservers = 2;
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Add all observers to voter set one by one
+        for (int voterIdToAdd = numVoters; voterIdToAdd < numVoters + 
numObservers; voterIdToAdd++) {
+            addVoter(cluster, scheduler, voterIdToAdd, expectedVoterIds);
+        }
+        runUntilNewHighWatermark(cluster, scheduler);

Review Comment:
   what is the purpose of this check? highWatermark + 10 seems a bit arbitrary



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable(
         scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddVotersIfMajorityIsReachable(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 3) int numObservers
+    ) {
+        int numVoters = 5;
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition the nodes into two sets. Nodes are reachable within each 
set,
+        // but the two sets cannot communicate with each other. We should be 
able
+        // to make progress even if an election is needed in the larger set.
+        int firstObserverId = numVoters;
+        router.filter(
+            0,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(
+            1,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(2, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(3, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(4, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(firstObserverId, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+
+        long partitionLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * 
partitionLogEndOffset));
+
+        long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 
1));
+        long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 
3, 4));
+
+        assertTrue(

Review Comment:
   what's the point of the assertion? seems a bit inexact (i.e. you should know 
the exact max HW that the smaller partition has because it cannot have changed 
after the filters were applied?)



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -602,6 +858,14 @@ void schedule(Runnable action, int delayMs, int periodMs, 
int jitterMs) {
             queue.offer(event);
         }
 
+        void scheduleUntil(Runnable action, Supplier<Boolean> exitCondition, 
int delayMs, int periodMs, int jitterMs) {
+            long initialDeadlineMs = time.milliseconds() + delayMs;
+            int eventId = eventIdGenerator.incrementAndGet();
+            ScheduledUntilConditionMetEvent event = new 
ScheduledUntilConditionMetEvent(action, eventId, random, initialDeadlineMs, 
periodMs, jitterMs, exitCondition);
+            queue.offer(event);
+

Review Comment:
   nit: extra new line



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -475,6 +537,104 @@ void canRecoverFromSingleNodeCommittedDataLoss(
         scheduler.runUntil(() -> 
cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddObserverAndRemovePartitionedVoter(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 3, max = 5) int numVoters
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, 1, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition a random voter, and add an observer to replace it
+        RaftNode voterToRemove = 
cluster.running.get(random.nextInt(numVoters));
+        router.filter(voterToRemove.id(), new DropAllTraffic());
+        addVoter(cluster, scheduler, numVoters, expectedVoterIds);

Review Comment:
   same question here, does expectedVoterIds need to include the observer you 
are adding?



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable(
         scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddVotersIfMajorityIsReachable(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 3) int numObservers
+    ) {
+        int numVoters = 5;
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition the nodes into two sets. Nodes are reachable within each 
set,
+        // but the two sets cannot communicate with each other. We should be 
able
+        // to make progress even if an election is needed in the larger set.
+        int firstObserverId = numVoters;
+        router.filter(
+            0,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(
+            1,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(2, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(3, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(4, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(firstObserverId, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+
+        long partitionLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * 
partitionLogEndOffset));
+
+        long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 
1));
+        long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 
3, 4));
+
+        assertTrue(
+            majorityHighWatermark > minorityHighWatermark,
+            String.format(
+                "majorityHighWatermark = %s, minorityHighWatermark = %s",
+                majorityHighWatermark,
+                minorityHighWatermark
+            )
+        );
+
+        // Verify we can add a voter, since a majority is still reachable
+        addVoter(cluster, scheduler, firstObserverId, expectedVoterIds);
+
+        // Now restore the partition and verify everyone catches up
+        router.filter(0, new PermitAllTraffic());
+        router.filter(1, new PermitAllTraffic());
+        router.filter(2, new PermitAllTraffic());
+        router.filter(3, new PermitAllTraffic());
+        router.filter(4, new PermitAllTraffic());
+        router.filter(firstObserverId, new PermitAllTraffic());
+
+        long restoredLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
+    }
+
     @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
     void leadershipAssignedOnlyOnceWithNetworkPartitionIfThereExistsMajority(
         @ForAll int seed,
-        @ForAll @IntRange(min = 0, max = 3) int numObservers
+        @ForAll @IntRange(min = 0, max = 3) int numObservers,
+        @ForAll boolean withKip853
     ) {
         int numVoters = 5;
         Random random = new Random(seed);
-        Cluster cluster = new Cluster(numVoters, numObservers, random);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, 
withKip853);
         MessageRouter router = new MessageRouter(cluster);
         EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
         scheduler.addInvariant(new StableLeadership(cluster));
 
+        // Start cluster
+        cluster.startAll();

Review Comment:
   you change the meaning of the test if you move this (this tests the scenario 
where network partition exists before cluster starts, the following test 
addresses a scenario where network partition starts after cluster is up for a 
while)



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -475,6 +537,104 @@ void canRecoverFromSingleNodeCommittedDataLoss(
         scheduler.runUntil(() -> 
cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddObserverAndRemovePartitionedVoter(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 3, max = 5) int numVoters
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, 1, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition a random voter, and add an observer to replace it

Review Comment:
   ```
   // Partition a random voter. Add first observer as new voter
   ```



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1457,83 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of the following
+                * voter sets has reached the high watermark:
+                * 1. the leader's voter set at the HWM (i.e. the last 
committed voter set)
+                * 2. the leader's lastVoterSet() (which may or may not be 
committed)
+                * Note that 1 and 2 can be the same set, but when they are 
not, lastVoterSet() is uncommitted,
+                * which follows from the AtMostOneUncommittedVoterSet 
invariant.
+                *
+                * A more elaborate check is necessary for this invariant 
because this method can get called after the
+                * leader has updated its lastVoterSet() with a new uncommitted 
voter set, but before the leader has
+                * updated its high watermark using the new voter set. In this 
case, we need to check that the majority
+                * of the last committed voter set has reached the current high 
watermark.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {
+                                long nodesReachedHighWatermark = 
numReachedHighWatermark(highWatermark, otherVoterSet.voterIds());
+                                assertTrue(
+                                    nodesReachedHighWatermark >= 
cluster.majoritySize(otherVoterSet.size()),
+                                    "Insufficient nodes have reached current 
high watermark. Expected at least " +
+                                        
cluster.majoritySize(otherVoterSet.size()) + " but got " + 
nodesReachedHighWatermark);
+                            });
+                            return;
+                        }
+                        assertTrue(
+                            numReachedHighWatermark >= 
cluster.majoritySize(voterSet.size()),
+                            "Insufficient nodes have reached current high 
watermark. Expected at least " +
+                                cluster.majoritySize(voterSet.size()) + " but 
got " + numReachedHighWatermark);
+                    });
+                });
+            } else {
+                cluster.leaderHighWatermark().ifPresent(highWatermark -> {
+                    long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet());
+                    assertTrue(
+                        numReachedHighWatermark >= 
cluster.majoritySize(cluster.initialVoters.size()),
+                        "Insufficient nodes have reached current high 
watermark");
+                });
+            }
+        }
+
+        private long numReachedHighWatermark(long highWatermark, Set<Integer> 
voterIds) {
+            return cluster.persistentStates.entrySet().stream()
+                .filter(entry -> voterIds.contains(entry.getKey()))
+                .filter(entry -> entry.getValue().log.endOffset().offset() >= 
highWatermark)
+                .count();
+        }
+    }
+
+    private static class AtMostOneUncommittedVotersRecord implements Invariant 
{
+        final Cluster cluster;
+
+        private AtMostOneUncommittedVotersRecord(Cluster cluster) {
+            this.cluster = cluster;
+        }
+
+        @Override
+        public void verify() {
+            cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                leaderNode.log.readBatches(leaderNode.highWatermark(), 
OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> {
+                    Optional<LogEntry> uncommittedVotersEntry = 
Optional.empty();

Review Comment:
   can this just be a bool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to