ahuang98 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r1975971708


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -117,40 +125,101 @@ public class RaftEventSimulationTest {
     void canElectInitialLeader(
         @ForAll int seed,
         @ForAll @IntRange(min = 1, max = 5) int numVoters,
-        @ForAll @IntRange(min = 0, max = 5) int numObservers
+        @ForAll @IntRange(min = 0, max = 5) int numObservers,
+        @ForAll boolean withKip853
     ) {
         Random random = new Random(seed);
-        Cluster cluster = new Cluster(numVoters, numObservers, random);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, 
withKip853);
         MessageRouter router = new MessageRouter(cluster);
         EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
 
-        cluster.startAll();
-        schedulePolling(scheduler, cluster, 3, 5);
-        scheduler.schedule(router::deliverAll, 0, 2, 1);
-        scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
-        scheduler.runUntil(cluster::hasConsistentLeader);
-        scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
+        initializeClusterAndStartAppending(cluster, router, scheduler, 2);
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canElectInitialLeaderAndAddVoter(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 5) int numVoters,
+        @ForAll @IntRange(min = 1, max = 5) int numObservers
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithKip853Invariants(cluster);
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+        int firstObserverId = numVoters;
+        scheduler.schedule(new AddVoterAction(cluster, 
cluster.running.get(firstObserverId)), 0, 5, 3);
+        scheduler.runUntil(() -> 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size()
 == numVoters + 1);
+        VoterSet latestVoterSet = 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet();
+        scheduler.runUntil(() -> 
cluster.allHaveLatestVoterSet(latestVoterSet));

Review Comment:
   this is an interesting condition to have - can this just be replaced with 
allReachedHighWatermark?



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -487,6 +555,12 @@ private EventScheduler 
schedulerWithDefaultInvariants(Cluster cluster) {
         return scheduler;
     }
 
+    private EventScheduler schedulerWithKip853Invariants(Cluster cluster) {
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        scheduler.addInvariant(new AtMostOneUncommittedVoterSet(cluster));

Review Comment:
   I understand your points - but I do feel `initialVoterSetFromIds` is strange 
as it is now since it returns an empty set when withKip853=false. The name of 
the method doesn't really convey any reason why the behavior should be 
different with or without kip 853 (e.g. an initial voter set should exist 
regardless)



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -117,40 +125,101 @@ public class RaftEventSimulationTest {
     void canElectInitialLeader(
         @ForAll int seed,
         @ForAll @IntRange(min = 1, max = 5) int numVoters,
-        @ForAll @IntRange(min = 0, max = 5) int numObservers
+        @ForAll @IntRange(min = 0, max = 5) int numObservers,
+        @ForAll boolean withKip853
     ) {
         Random random = new Random(seed);
-        Cluster cluster = new Cluster(numVoters, numObservers, random);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, 
withKip853);
         MessageRouter router = new MessageRouter(cluster);
         EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
 
-        cluster.startAll();
-        schedulePolling(scheduler, cluster, 3, 5);
-        scheduler.schedule(router::deliverAll, 0, 2, 1);
-        scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
-        scheduler.runUntil(cluster::hasConsistentLeader);
-        scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
+        initializeClusterAndStartAppending(cluster, router, scheduler, 2);
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canElectInitialLeaderAndAddVoter(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 5) int numVoters,
+        @ForAll @IntRange(min = 1, max = 5) int numObservers
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithKip853Invariants(cluster);
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+        int firstObserverId = numVoters;
+        scheduler.schedule(new AddVoterAction(cluster, 
cluster.running.get(firstObserverId)), 0, 5, 3);
+        scheduler.runUntil(() -> 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size()
 == numVoters + 1);
+        VoterSet latestVoterSet = 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet();
+        scheduler.runUntil(() -> 
cluster.allHaveLatestVoterSet(latestVoterSet));
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddAndRemoveVoters(

Review Comment:
   so the first test checks if we can add a single voter - this one checks if 
we can add multiple and remove all but the first voter
   
   would it be worth having a test which confirms we can remove _and then_ add 
back the voter? and/or a test for removing specifically the leader?



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -487,6 +555,12 @@ private EventScheduler 
schedulerWithDefaultInvariants(Cluster cluster) {
         return scheduler;
     }
 
+    private EventScheduler schedulerWithKip853Invariants(Cluster cluster) {
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        scheduler.addInvariant(new AtMostOneUncommittedVoterSet(cluster));

Review Comment:
   even if `AtMostOneUncommittedVoterSet` is a trivial check for tests which 
are not adding or removing voters, conceptually it should hold true - it might 
be worth making it work for all tests if it is not too difficult



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -117,40 +125,101 @@ public class RaftEventSimulationTest {
     void canElectInitialLeader(
         @ForAll int seed,
         @ForAll @IntRange(min = 1, max = 5) int numVoters,
-        @ForAll @IntRange(min = 0, max = 5) int numObservers
+        @ForAll @IntRange(min = 0, max = 5) int numObservers,
+        @ForAll boolean withKip853
     ) {
         Random random = new Random(seed);
-        Cluster cluster = new Cluster(numVoters, numObservers, random);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, 
withKip853);
         MessageRouter router = new MessageRouter(cluster);
         EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
 
-        cluster.startAll();
-        schedulePolling(scheduler, cluster, 3, 5);
-        scheduler.schedule(router::deliverAll, 0, 2, 1);
-        scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
-        scheduler.runUntil(cluster::hasConsistentLeader);
-        scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
+        initializeClusterAndStartAppending(cluster, router, scheduler, 2);
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canElectInitialLeaderAndAddVoter(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 5) int numVoters,
+        @ForAll @IntRange(min = 1, max = 5) int numObservers
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithKip853Invariants(cluster);
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+        int firstObserverId = numVoters;
+        scheduler.schedule(new AddVoterAction(cluster, 
cluster.running.get(firstObserverId)), 0, 5, 3);
+        scheduler.runUntil(() -> 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size()
 == numVoters + 1);
+        VoterSet latestVoterSet = 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet();
+        scheduler.runUntil(() -> 
cluster.allHaveLatestVoterSet(latestVoterSet));
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddAndRemoveVoters(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 5) int numVoters,
+        @ForAll @IntRange(min = 1, max = 10) int numObservers
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithKip853Invariants(cluster);
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Schedule all the observers to be added as voters
+        for (int voterIdToAdd = numVoters; voterIdToAdd < numVoters + 
numObservers; voterIdToAdd++) {
+            int nextVoterSetSize = voterIdToAdd + 1;
+            scheduler.schedule(new AddVoterAction(cluster, 
cluster.running.get(voterIdToAdd)), 0, 5, 3);
+            scheduler.runUntil(() -> 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size()
 == nextVoterSetSize);
+        }
+        int voterSetSize = numVoters + numObservers;
+
+        // Schedule all voters besides the first to be removed
+        for (int voterIdToRemove = numVoters + numObservers - 1; 
voterIdToRemove > 0; voterIdToRemove--) {
+            int nextVoterSetSize = voterSetSize - 1;
+            scheduler.schedule(new RemoveVoterAction(cluster, 
cluster.running.get(voterIdToRemove)), 0, 5, 3);
+            if (cluster.leaderWithMaxEpoch().isEmpty()) {
+                scheduler.runUntil(cluster::hasConsistentLeader);
+            }
+            scheduler.runUntil(() -> 
cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size()
 == nextVoterSetSize);
+        }
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canElectInitialLeaderAndRemoveVoter(

Review Comment:
   what does this test that isn't tested by canAddAndRemoveVoters



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -645,39 +837,56 @@ private static class Cluster {
         final AtomicInteger correlationIdCounter = new AtomicInteger();
         final MockTime time = new MockTime();
         final String clusterId = Uuid.randomUuid().toString();
-        final Map<Integer, Node> voters = new HashMap<>();
-        final Map<Integer, PersistentState> nodes = new HashMap<>();
+        final Map<Integer, Node> initialVoters = new HashMap<>();
+        final Map<Integer, PersistentState> persistentStates = new HashMap<>();
         final Map<Integer, RaftNode> running = new HashMap<>();
+        final boolean withKip853;
 
-        private Cluster(int numVoters, int numObservers, Random random) {
+        private Cluster(int numVoters, int numObservers, Random random, 
boolean withKip853) {
             this.random = random;
+            this.withKip853 = withKip853;
 
             for (int nodeId = 0; nodeId < numVoters; nodeId++) {
-                voters.put(nodeId, nodeFromId(nodeId));
-                nodes.put(nodeId, new PersistentState(nodeId));
+                initialVoters.put(nodeId, nodeFromId(nodeId));
+                persistentStates.put(nodeId, new PersistentState(nodeId));
             }
 
             for (int nodeIdDelta = 0; nodeIdDelta < numObservers; 
nodeIdDelta++) {
                 int nodeId = numVoters + nodeIdDelta;
-                nodes.put(nodeId, new PersistentState(nodeId));
+                persistentStates.put(nodeId, new PersistentState(nodeId));
             }
         }
 
         Set<InetSocketAddress> endpointsFromIds(Set<Integer> nodeIds) {
-            return voters
+            return running
                 .values()
                 .stream()
-                .filter(node -> nodeIds.contains(node.id()))
+                .filter(node -> nodeIds.contains(node.nodeId))
                 .map(Cluster::nodeAddress)
                 .collect(Collectors.toSet());
         }
 
+        Optional<VoterSet> initialVoterSetFromIds() {

Review Comment:
   see previous comment for this method - feels weird this would rely on 
withKip853. can we just remove the withKip853 conditional and always return a 
voterset based on the initialVoters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to