kevin-wu24 commented on code in PR #18987: URL: https://github.com/apache/kafka/pull/18987#discussion_r1980466882
########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -1127,14 +1457,83 @@ private MajorityReachedHighWatermark(Cluster cluster) { @Override public void verify() { - cluster.leaderHighWatermark().ifPresent(highWatermark -> { - long numReachedHighWatermark = cluster.nodes.entrySet().stream() - .filter(entry -> cluster.voters.containsKey(entry.getKey())) - .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) - .count(); - assertTrue( - numReachedHighWatermark >= cluster.majoritySize(), - "Insufficient nodes have reached current high watermark"); + if (cluster.withKip853) { + /* + * For clusters running in KIP-853 mode, we check that a majority of at least one of the following + * voter sets has reached the high watermark: + * 1. the leader's voter set at the HWM (i.e. the last committed voter set) + * 2. the leader's lastVoterSet() (which may or may not be committed) + * Note that 1 and 2 can be the same set, but when they are not, lastVoterSet() is uncommitted, + * which follows from the AtMostOneUncommittedVoterSet invariant. + * + * A more elaborate check is necessary for this invariant because this method can get called after the + * leader has updated its lastVoterSet() with a new uncommitted voter set, but before the leader has + * updated its high watermark using the new voter set. In this case, we need to check that the majority + * of the last committed voter set has reached the current high watermark. + * */ + cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> { + leaderNode.client.highWatermark().ifPresent(highWatermark -> { + VoterSet voterSet = leaderNode.client.partitionState().lastVoterSet(); + long numReachedHighWatermark = numReachedHighWatermark(highWatermark, voterSet.voterIds()); + if (numReachedHighWatermark < cluster.majoritySize(voterSet.size())) { + leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 1).ifPresent(otherVoterSet -> { + long nodesReachedHighWatermark = numReachedHighWatermark(highWatermark, otherVoterSet.voterIds()); + assertTrue( + nodesReachedHighWatermark >= cluster.majoritySize(otherVoterSet.size()), + "Insufficient nodes have reached current high watermark. Expected at least " + + cluster.majoritySize(otherVoterSet.size()) + " but got " + nodesReachedHighWatermark); + }); + return; + } + assertTrue( + numReachedHighWatermark >= cluster.majoritySize(voterSet.size()), + "Insufficient nodes have reached current high watermark. Expected at least " + + cluster.majoritySize(voterSet.size()) + " but got " + numReachedHighWatermark); + }); + }); + } else { + cluster.leaderHighWatermark().ifPresent(highWatermark -> { + long numReachedHighWatermark = numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet()); + assertTrue( + numReachedHighWatermark >= cluster.majoritySize(cluster.initialVoters.size()), + "Insufficient nodes have reached current high watermark"); + }); + } + } + + private long numReachedHighWatermark(long highWatermark, Set<Integer> voterIds) { + return cluster.persistentStates.entrySet().stream() + .filter(entry -> voterIds.contains(entry.getKey())) + .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) + .count(); + } + } + + private static class AtMostOneUncommittedVotersRecord implements Invariant { + final Cluster cluster; + + private AtMostOneUncommittedVotersRecord(Cluster cluster) { + this.cluster = cluster; + } + + @Override + public void verify() { + cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> { + leaderNode.log.readBatches(leaderNode.highWatermark(), OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> { + Optional<LogEntry> uncommittedVotersEntry = Optional.empty(); Review Comment: Yeah probably. ########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -1127,14 +1457,83 @@ private MajorityReachedHighWatermark(Cluster cluster) { @Override public void verify() { - cluster.leaderHighWatermark().ifPresent(highWatermark -> { - long numReachedHighWatermark = cluster.nodes.entrySet().stream() - .filter(entry -> cluster.voters.containsKey(entry.getKey())) - .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) - .count(); - assertTrue( - numReachedHighWatermark >= cluster.majoritySize(), - "Insufficient nodes have reached current high watermark"); + if (cluster.withKip853) { + /* + * For clusters running in KIP-853 mode, we check that a majority of at least one of the following + * voter sets has reached the high watermark: + * 1. the leader's voter set at the HWM (i.e. the last committed voter set) + * 2. the leader's lastVoterSet() (which may or may not be committed) + * Note that 1 and 2 can be the same set, but when they are not, lastVoterSet() is uncommitted, + * which follows from the AtMostOneUncommittedVoterSet invariant. + * + * A more elaborate check is necessary for this invariant because this method can get called after the + * leader has updated its lastVoterSet() with a new uncommitted voter set, but before the leader has + * updated its high watermark using the new voter set. In this case, we need to check that the majority + * of the last committed voter set has reached the current high watermark. + * */ + cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> { + leaderNode.client.highWatermark().ifPresent(highWatermark -> { + VoterSet voterSet = leaderNode.client.partitionState().lastVoterSet(); + long numReachedHighWatermark = numReachedHighWatermark(highWatermark, voterSet.voterIds()); + if (numReachedHighWatermark < cluster.majoritySize(voterSet.size())) { + leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 1).ifPresent(otherVoterSet -> { + long nodesReachedHighWatermark = numReachedHighWatermark(highWatermark, otherVoterSet.voterIds()); + assertTrue( + nodesReachedHighWatermark >= cluster.majoritySize(otherVoterSet.size()), + "Insufficient nodes have reached current high watermark. Expected at least " + + cluster.majoritySize(otherVoterSet.size()) + " but got " + nodesReachedHighWatermark); + }); + return; + } + assertTrue( + numReachedHighWatermark >= cluster.majoritySize(voterSet.size()), + "Insufficient nodes have reached current high watermark. Expected at least " + + cluster.majoritySize(voterSet.size()) + " but got " + numReachedHighWatermark); + }); + }); + } else { + cluster.leaderHighWatermark().ifPresent(highWatermark -> { + long numReachedHighWatermark = numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet()); + assertTrue( + numReachedHighWatermark >= cluster.majoritySize(cluster.initialVoters.size()), + "Insufficient nodes have reached current high watermark"); + }); + } + } + + private long numReachedHighWatermark(long highWatermark, Set<Integer> voterIds) { + return cluster.persistentStates.entrySet().stream() + .filter(entry -> voterIds.contains(entry.getKey())) + .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) + .count(); + } + } + + private static class AtMostOneUncommittedVotersRecord implements Invariant { + final Cluster cluster; + + private AtMostOneUncommittedVotersRecord(Cluster cluster) { + this.cluster = cluster; + } + + @Override + public void verify() { + cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> { + leaderNode.log.readBatches(leaderNode.highWatermark(), OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> { + Optional<LogEntry> uncommittedVotersEntry = Optional.empty(); Review Comment: Yeah probably. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org