ahuang98 commented on code in PR #18987: URL: https://github.com/apache/kafka/pull/18987#discussion_r1975971708
########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -117,40 +125,101 @@ public class RaftEventSimulationTest { void canElectInitialLeader( @ForAll int seed, @ForAll @IntRange(min = 1, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers + @ForAll @IntRange(min = 0, max = 5) int numObservers, + @ForAll boolean withKip853 ) { Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); + Cluster cluster = new Cluster(numVoters, numObservers, random, withKip853); MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); + initializeClusterAndStartAppending(cluster, router, scheduler, 2); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canElectInitialLeaderAndAddVoter( + @ForAll int seed, + @ForAll @IntRange(min = 1, max = 5) int numVoters, + @ForAll @IntRange(min = 1, max = 5) int numObservers + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithKip853Invariants(cluster); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + int firstObserverId = numVoters; + scheduler.schedule(new AddVoterAction(cluster, cluster.running.get(firstObserverId)), 0, 5, 3); + scheduler.runUntil(() -> cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size() == numVoters + 1); + VoterSet latestVoterSet = cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet(); + scheduler.runUntil(() -> cluster.allHaveLatestVoterSet(latestVoterSet)); Review Comment: this is an interesting condition to have - can this just be replaced with allReachedHighWatermark? ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -487,6 +555,12 @@ private EventScheduler schedulerWithDefaultInvariants(Cluster cluster) { return scheduler; } + private EventScheduler schedulerWithKip853Invariants(Cluster cluster) { + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + scheduler.addInvariant(new AtMostOneUncommittedVoterSet(cluster)); Review Comment: I understand your points - but I do feel `initialVoterSetFromIds` is strange as it is now since it returns an empty set when withKip853=false. The name of the method doesn't really convey any reason why the behavior should be different with or without kip 853 (e.g. an initial voter set should exist regardless) ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -117,40 +125,101 @@ public class RaftEventSimulationTest { void canElectInitialLeader( @ForAll int seed, @ForAll @IntRange(min = 1, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers + @ForAll @IntRange(min = 0, max = 5) int numObservers, + @ForAll boolean withKip853 ) { Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); + Cluster cluster = new Cluster(numVoters, numObservers, random, withKip853); MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); + initializeClusterAndStartAppending(cluster, router, scheduler, 2); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canElectInitialLeaderAndAddVoter( + @ForAll int seed, + @ForAll @IntRange(min = 1, max = 5) int numVoters, + @ForAll @IntRange(min = 1, max = 5) int numObservers + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithKip853Invariants(cluster); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + int firstObserverId = numVoters; + scheduler.schedule(new AddVoterAction(cluster, cluster.running.get(firstObserverId)), 0, 5, 3); + scheduler.runUntil(() -> cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size() == numVoters + 1); + VoterSet latestVoterSet = cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet(); + scheduler.runUntil(() -> cluster.allHaveLatestVoterSet(latestVoterSet)); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddAndRemoveVoters( Review Comment: so the first test checks if we can add a single voter - this one checks if we can add multiple and remove all but the first voter would it be worth having a test which confirms we can remove _and then_ add back the voter? and/or a test for removing specifically the leader? ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -487,6 +555,12 @@ private EventScheduler schedulerWithDefaultInvariants(Cluster cluster) { return scheduler; } + private EventScheduler schedulerWithKip853Invariants(Cluster cluster) { + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + scheduler.addInvariant(new AtMostOneUncommittedVoterSet(cluster)); Review Comment: even if `AtMostOneUncommittedVoterSet` is a trivial check for tests which are not adding or removing voters, conceptually it should hold true - it might be worth making it work for all tests if it is not too difficult ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -117,40 +125,101 @@ public class RaftEventSimulationTest { void canElectInitialLeader( @ForAll int seed, @ForAll @IntRange(min = 1, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers + @ForAll @IntRange(min = 0, max = 5) int numObservers, + @ForAll boolean withKip853 ) { Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); + Cluster cluster = new Cluster(numVoters, numObservers, random, withKip853); MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); + initializeClusterAndStartAppending(cluster, router, scheduler, 2); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canElectInitialLeaderAndAddVoter( + @ForAll int seed, + @ForAll @IntRange(min = 1, max = 5) int numVoters, + @ForAll @IntRange(min = 1, max = 5) int numObservers + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithKip853Invariants(cluster); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + int firstObserverId = numVoters; + scheduler.schedule(new AddVoterAction(cluster, cluster.running.get(firstObserverId)), 0, 5, 3); + scheduler.runUntil(() -> cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size() == numVoters + 1); + VoterSet latestVoterSet = cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet(); + scheduler.runUntil(() -> cluster.allHaveLatestVoterSet(latestVoterSet)); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddAndRemoveVoters( + @ForAll int seed, + @ForAll @IntRange(min = 1, max = 5) int numVoters, + @ForAll @IntRange(min = 1, max = 10) int numObservers + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithKip853Invariants(cluster); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Schedule all the observers to be added as voters + for (int voterIdToAdd = numVoters; voterIdToAdd < numVoters + numObservers; voterIdToAdd++) { + int nextVoterSetSize = voterIdToAdd + 1; + scheduler.schedule(new AddVoterAction(cluster, cluster.running.get(voterIdToAdd)), 0, 5, 3); + scheduler.runUntil(() -> cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size() == nextVoterSetSize); + } + int voterSetSize = numVoters + numObservers; + + // Schedule all voters besides the first to be removed + for (int voterIdToRemove = numVoters + numObservers - 1; voterIdToRemove > 0; voterIdToRemove--) { + int nextVoterSetSize = voterSetSize - 1; + scheduler.schedule(new RemoveVoterAction(cluster, cluster.running.get(voterIdToRemove)), 0, 5, 3); + if (cluster.leaderWithMaxEpoch().isEmpty()) { + scheduler.runUntil(cluster::hasConsistentLeader); + } + scheduler.runUntil(() -> cluster.leaderWithMaxEpoch().get().client.partitionState().lastVoterSet().size() == nextVoterSetSize); + } + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canElectInitialLeaderAndRemoveVoter( Review Comment: what does this test that isn't tested by canAddAndRemoveVoters ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -645,39 +837,56 @@ private static class Cluster { final AtomicInteger correlationIdCounter = new AtomicInteger(); final MockTime time = new MockTime(); final String clusterId = Uuid.randomUuid().toString(); - final Map<Integer, Node> voters = new HashMap<>(); - final Map<Integer, PersistentState> nodes = new HashMap<>(); + final Map<Integer, Node> initialVoters = new HashMap<>(); + final Map<Integer, PersistentState> persistentStates = new HashMap<>(); final Map<Integer, RaftNode> running = new HashMap<>(); + final boolean withKip853; - private Cluster(int numVoters, int numObservers, Random random) { + private Cluster(int numVoters, int numObservers, Random random, boolean withKip853) { this.random = random; + this.withKip853 = withKip853; for (int nodeId = 0; nodeId < numVoters; nodeId++) { - voters.put(nodeId, nodeFromId(nodeId)); - nodes.put(nodeId, new PersistentState(nodeId)); + initialVoters.put(nodeId, nodeFromId(nodeId)); + persistentStates.put(nodeId, new PersistentState(nodeId)); } for (int nodeIdDelta = 0; nodeIdDelta < numObservers; nodeIdDelta++) { int nodeId = numVoters + nodeIdDelta; - nodes.put(nodeId, new PersistentState(nodeId)); + persistentStates.put(nodeId, new PersistentState(nodeId)); } } Set<InetSocketAddress> endpointsFromIds(Set<Integer> nodeIds) { - return voters + return running .values() .stream() - .filter(node -> nodeIds.contains(node.id())) + .filter(node -> nodeIds.contains(node.nodeId)) .map(Cluster::nodeAddress) .collect(Collectors.toSet()); } + Optional<VoterSet> initialVoterSetFromIds() { Review Comment: see previous comment for this method - feels weird this would rely on withKip853. can we just remove the withKip853 conditional and always return a voterset based on the initialVoters? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org