kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r1980465079


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable(
         scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddVotersIfMajorityIsReachable(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 3) int numObservers
+    ) {
+        int numVoters = 5;
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition the nodes into two sets. Nodes are reachable within each 
set,
+        // but the two sets cannot communicate with each other. We should be 
able
+        // to make progress even if an election is needed in the larger set.
+        int firstObserverId = numVoters;
+        router.filter(
+            0,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(
+            1,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(2, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(3, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(4, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(firstObserverId, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+
+        long partitionLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * 
partitionLogEndOffset));
+
+        long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 
1));
+        long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 
3, 4));
+
+        assertTrue(
+            majorityHighWatermark > minorityHighWatermark,
+            String.format(
+                "majorityHighWatermark = %s, minorityHighWatermark = %s",
+                majorityHighWatermark,
+                minorityHighWatermark
+            )
+        );
+
+        // Verify we can add a voter, since a majority is still reachable
+        addVoter(cluster, scheduler, firstObserverId, expectedVoterIds);
+
+        // Now restore the partition and verify everyone catches up
+        router.filter(0, new PermitAllTraffic());
+        router.filter(1, new PermitAllTraffic());
+        router.filter(2, new PermitAllTraffic());
+        router.filter(3, new PermitAllTraffic());
+        router.filter(4, new PermitAllTraffic());
+        router.filter(firstObserverId, new PermitAllTraffic());
+
+        long restoredLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
+    }
+
     @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
     void leadershipAssignedOnlyOnceWithNetworkPartitionIfThereExistsMajority(
         @ForAll int seed,
-        @ForAll @IntRange(min = 0, max = 3) int numObservers
+        @ForAll @IntRange(min = 0, max = 3) int numObservers,
+        @ForAll boolean withKip853
     ) {
         int numVoters = 5;
         Random random = new Random(seed);
-        Cluster cluster = new Cluster(numVoters, numObservers, random);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, 
withKip853);
         MessageRouter router = new MessageRouter(cluster);
         EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
         scheduler.addInvariant(new StableLeadership(cluster));
 
+        // Start cluster
+        cluster.startAll();

Review Comment:
   I guess what we mean by "cluster starting" is not super clear. I think the 
intention of this test is to have a network partition exist before the KRaft 
protocol starts (i.e. nodes sending messages to each other and performing the 
initial leader election). This is what it still does now, since the partition 
is defined before calls to the following:
   ```
           schedulePolling(scheduler, cluster, 3, 5);
           scheduler.schedule(router::deliverAll, 0, 2, 1);
           scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 1);
           scheduler.runUntil(cluster::hasConsistentLeader);
   ```
   What `cluster.startAll()` does is just update the `Cluster`'s state and 
initialize all the stuff KRaft uses, but if no messages are being delivered and 
no polling is occurring, so I don't think we've actually "started" yet.
   
   I think the function naming is misleading, it should be something like 
`initializeNodes()`. What "starts" KRaft from my perspective are the lines 
above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to