ahuang98 commented on code in PR #18987: URL: https://github.com/apache/kafka/pull/18987#discussion_r1980430607
########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable( scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * restoredLogEndOffset)); } + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddVotersIfMajorityIsReachable( + @ForAll int seed, + @ForAll @IntRange(min = 1, max = 3) int numObservers + ) { + int numVoters = 5; + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Partition the nodes into two sets. Nodes are reachable within each set, + // but the two sets cannot communicate with each other. We should be able + // to make progress even if an election is needed in the larger set. + int firstObserverId = numVoters; + router.filter( + 0, + new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4, firstObserverId))) + ); + router.filter( + 1, + new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4, firstObserverId))) + ); + router.filter(2, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(3, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(4, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(firstObserverId, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + + long partitionLogEndOffset = cluster.maxLogEndOffset(); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * partitionLogEndOffset)); + + long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 1)); + long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 3, 4)); + + assertTrue( + majorityHighWatermark > minorityHighWatermark, + String.format( + "majorityHighWatermark = %s, minorityHighWatermark = %s", + majorityHighWatermark, + minorityHighWatermark + ) + ); + + // Verify we can add a voter, since a majority is still reachable + addVoter(cluster, scheduler, firstObserverId, expectedVoterIds); Review Comment: should expectedVoterIds include the observer you're trying to add? ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -117,40 +126,58 @@ public class RaftEventSimulationTest { void canElectInitialLeader( @ForAll int seed, @ForAll @IntRange(min = 1, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers + @ForAll @IntRange(min = 0, max = 5) int numObservers, + @ForAll boolean withKip853 ) { Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); + Cluster cluster = new Cluster(numVoters, numObservers, random, withKip853); MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); + initializeClusterAndStartAppending(cluster, router, scheduler, 2); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddAndRemoveVoters( + @ForAll int seed + ) { + Random random = new Random(seed); + int numVoters = 3; + int numObservers = 2; + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Add all observers to voter set one by one + for (int voterIdToAdd = numVoters; voterIdToAdd < numVoters + numObservers; voterIdToAdd++) { + addVoter(cluster, scheduler, voterIdToAdd, expectedVoterIds); + } + runUntilNewHighWatermark(cluster, scheduler); Review Comment: what is the purpose of this check? highWatermark + 10 seems a bit arbitrary ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable( scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * restoredLogEndOffset)); } + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddVotersIfMajorityIsReachable( + @ForAll int seed, + @ForAll @IntRange(min = 1, max = 3) int numObservers + ) { + int numVoters = 5; + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Partition the nodes into two sets. Nodes are reachable within each set, + // but the two sets cannot communicate with each other. We should be able + // to make progress even if an election is needed in the larger set. + int firstObserverId = numVoters; + router.filter( + 0, + new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4, firstObserverId))) + ); + router.filter( + 1, + new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4, firstObserverId))) + ); + router.filter(2, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(3, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(4, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(firstObserverId, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + + long partitionLogEndOffset = cluster.maxLogEndOffset(); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * partitionLogEndOffset)); + + long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 1)); + long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 3, 4)); + + assertTrue( Review Comment: what's the point of the assertion? seems a bit inexact (i.e. you should know the exact max HW that the smaller partition has because it cannot have changed after the filters were applied?) ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -602,6 +858,14 @@ void schedule(Runnable action, int delayMs, int periodMs, int jitterMs) { queue.offer(event); } + void scheduleUntil(Runnable action, Supplier<Boolean> exitCondition, int delayMs, int periodMs, int jitterMs) { + long initialDeadlineMs = time.milliseconds() + delayMs; + int eventId = eventIdGenerator.incrementAndGet(); + ScheduledUntilConditionMetEvent event = new ScheduledUntilConditionMetEvent(action, eventId, random, initialDeadlineMs, periodMs, jitterMs, exitCondition); + queue.offer(event); + Review Comment: nit: extra new line ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -475,6 +537,104 @@ void canRecoverFromSingleNodeCommittedDataLoss( scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10)); } + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddObserverAndRemovePartitionedVoter( + @ForAll int seed, + @ForAll @IntRange(min = 3, max = 5) int numVoters + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, 1, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Partition a random voter, and add an observer to replace it + RaftNode voterToRemove = cluster.running.get(random.nextInt(numVoters)); + router.filter(voterToRemove.id(), new DropAllTraffic()); + addVoter(cluster, scheduler, numVoters, expectedVoterIds); Review Comment: same question here, does expectedVoterIds need to include the observer you are adding? ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -297,18 +309,83 @@ void canMakeProgressIfMajorityIsReachable( scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * restoredLogEndOffset)); } + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddVotersIfMajorityIsReachable( + @ForAll int seed, + @ForAll @IntRange(min = 1, max = 3) int numObservers + ) { + int numVoters = 5; + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Partition the nodes into two sets. Nodes are reachable within each set, + // but the two sets cannot communicate with each other. We should be able + // to make progress even if an election is needed in the larger set. + int firstObserverId = numVoters; + router.filter( + 0, + new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4, firstObserverId))) + ); + router.filter( + 1, + new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4, firstObserverId))) + ); + router.filter(2, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(3, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(4, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(firstObserverId, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + + long partitionLogEndOffset = cluster.maxLogEndOffset(); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * partitionLogEndOffset)); + + long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 1)); + long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 3, 4)); + + assertTrue( + majorityHighWatermark > minorityHighWatermark, + String.format( + "majorityHighWatermark = %s, minorityHighWatermark = %s", + majorityHighWatermark, + minorityHighWatermark + ) + ); + + // Verify we can add a voter, since a majority is still reachable + addVoter(cluster, scheduler, firstObserverId, expectedVoterIds); + + // Now restore the partition and verify everyone catches up + router.filter(0, new PermitAllTraffic()); + router.filter(1, new PermitAllTraffic()); + router.filter(2, new PermitAllTraffic()); + router.filter(3, new PermitAllTraffic()); + router.filter(4, new PermitAllTraffic()); + router.filter(firstObserverId, new PermitAllTraffic()); + + long restoredLogEndOffset = cluster.maxLogEndOffset(); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * restoredLogEndOffset)); + } + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void leadershipAssignedOnlyOnceWithNetworkPartitionIfThereExistsMajority( @ForAll int seed, - @ForAll @IntRange(min = 0, max = 3) int numObservers + @ForAll @IntRange(min = 0, max = 3) int numObservers, + @ForAll boolean withKip853 ) { int numVoters = 5; Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); + Cluster cluster = new Cluster(numVoters, numObservers, random, withKip853); MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); scheduler.addInvariant(new StableLeadership(cluster)); + // Start cluster + cluster.startAll(); Review Comment: you change the meaning of the test if you move this (this tests the scenario where network partition exists before cluster starts, the following test addresses a scenario where network partition starts after cluster is up for a while) ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -475,6 +537,104 @@ void canRecoverFromSingleNodeCommittedDataLoss( scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10)); } + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddObserverAndRemovePartitionedVoter( + @ForAll int seed, + @ForAll @IntRange(min = 3, max = 5) int numVoters + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, 1, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Partition a random voter, and add an observer to replace it Review Comment: ``` // Partition a random voter. Add first observer as new voter ``` ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -1127,14 +1457,83 @@ private MajorityReachedHighWatermark(Cluster cluster) { @Override public void verify() { - cluster.leaderHighWatermark().ifPresent(highWatermark -> { - long numReachedHighWatermark = cluster.nodes.entrySet().stream() - .filter(entry -> cluster.voters.containsKey(entry.getKey())) - .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) - .count(); - assertTrue( - numReachedHighWatermark >= cluster.majoritySize(), - "Insufficient nodes have reached current high watermark"); + if (cluster.withKip853) { + /* + * For clusters running in KIP-853 mode, we check that a majority of at least one of the following + * voter sets has reached the high watermark: + * 1. the leader's voter set at the HWM (i.e. the last committed voter set) + * 2. the leader's lastVoterSet() (which may or may not be committed) + * Note that 1 and 2 can be the same set, but when they are not, lastVoterSet() is uncommitted, + * which follows from the AtMostOneUncommittedVoterSet invariant. + * + * A more elaborate check is necessary for this invariant because this method can get called after the + * leader has updated its lastVoterSet() with a new uncommitted voter set, but before the leader has + * updated its high watermark using the new voter set. In this case, we need to check that the majority + * of the last committed voter set has reached the current high watermark. + * */ + cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> { + leaderNode.client.highWatermark().ifPresent(highWatermark -> { + VoterSet voterSet = leaderNode.client.partitionState().lastVoterSet(); + long numReachedHighWatermark = numReachedHighWatermark(highWatermark, voterSet.voterIds()); + if (numReachedHighWatermark < cluster.majoritySize(voterSet.size())) { + leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 1).ifPresent(otherVoterSet -> { + long nodesReachedHighWatermark = numReachedHighWatermark(highWatermark, otherVoterSet.voterIds()); + assertTrue( + nodesReachedHighWatermark >= cluster.majoritySize(otherVoterSet.size()), + "Insufficient nodes have reached current high watermark. Expected at least " + + cluster.majoritySize(otherVoterSet.size()) + " but got " + nodesReachedHighWatermark); + }); + return; + } + assertTrue( + numReachedHighWatermark >= cluster.majoritySize(voterSet.size()), + "Insufficient nodes have reached current high watermark. Expected at least " + + cluster.majoritySize(voterSet.size()) + " but got " + numReachedHighWatermark); + }); + }); + } else { + cluster.leaderHighWatermark().ifPresent(highWatermark -> { + long numReachedHighWatermark = numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet()); + assertTrue( + numReachedHighWatermark >= cluster.majoritySize(cluster.initialVoters.size()), + "Insufficient nodes have reached current high watermark"); + }); + } + } + + private long numReachedHighWatermark(long highWatermark, Set<Integer> voterIds) { + return cluster.persistentStates.entrySet().stream() + .filter(entry -> voterIds.contains(entry.getKey())) + .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) + .count(); + } + } + + private static class AtMostOneUncommittedVotersRecord implements Invariant { + final Cluster cluster; + + private AtMostOneUncommittedVotersRecord(Cluster cluster) { + this.cluster = cluster; + } + + @Override + public void verify() { + cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> { + leaderNode.log.readBatches(leaderNode.highWatermark(), OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> { + Optional<LogEntry> uncommittedVotersEntry = Optional.empty(); Review Comment: can this just be a bool -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org