jsancio commented on code in PR #16079: URL: https://github.com/apache/kafka/pull/16079#discussion_r1625072194
########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.empty(), state.highWatermark()); assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(20L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } + @Test + public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { + int node1 = 1; + int node2 = 2; + Set<Integer> originalVoterSet = mkSet(localId, node1, node2); + LeaderState<?> state = newLeaderState(originalVoterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // removing node1 should not decrement HW to 10L + Set<Integer> voterSetWithoutNode1 = mkSet(localId, node2); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutNode1)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW cannot change until after node2 catches up to last HW + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW should update to 16L + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + + @Test + public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { + int node1 = 1; + int node2 = 2; + Set<Integer> originalVoterSet = mkSet(localId, node1, node2); + LeaderState<?> state = newLeaderState(originalVoterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // removing leader should not decrement HW to 10L + Set<Integer> voterSetWithoutLeader = mkSet(node1, node2); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutLeader)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW cannot change until node2 catches up to last HW Review Comment: Before checking this, let's increase node1's LEO to 16 and show that it doesn't increase the HWM. ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.empty(), state.highWatermark()); assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(20L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } + @Test + public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { Review Comment: Are you planning to add a test for adding a follower to the voter set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org