kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r1981774312


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable(
         scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * 
restoredLogEndOffset));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddVotersIfMajorityIsReachable(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 1, max = 3) int numObservers
+    ) {
+        int numVoters = 5;
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition the nodes into two sets. Nodes are reachable within each 
set,
+        // but the two sets cannot communicate with each other. We should be 
able
+        // to make progress even if an election is needed in the larger set.
+        int firstObserverId = numVoters;
+        router.filter(
+            0,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(
+            1,
+            new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 
4, firstObserverId)))
+        );
+        router.filter(2, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(3, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(4, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+        router.filter(firstObserverId, new 
DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1))));
+
+        long partitionLogEndOffset = cluster.maxLogEndOffset();
+        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * 
partitionLogEndOffset));
+
+        long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 
1));
+        long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 
3, 4));
+
+        assertTrue(

Review Comment:
   > seems a bit inexact (i.e. you should know the exact max HW that the 
smaller partition has because it cannot have changed after the filters were 
applied?)
   
   This is not exactly true, but it's mainly due to the framework. Basically, 
the HWM on the minority can still advance a little bit after we partition the 
network. This is because prior to partitioning, the network router could have 
delivered some fetch responses that would advance the HWM on the minority node, 
but that message is just sitting on the node's message queue and hasn't been 
polled yet to update internal state. When we call:
   ```
   scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * 
partitionLogEndOffset));
   ```
   those minority nodes don't receive any new messages because of the filter, 
but their `poll` events can update their HWM value a little.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to