jsancio commented on code in PR #16079:
URL: https://github.com/apache/kafka/pull/16079#discussion_r1625072194


##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() {
         assertEquals(Optional.empty(), state.highWatermark());
         assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(20L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), 
voterSet));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
         assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
         assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
     }
 
+    @Test
+    public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() {
+        int node1 = 1;
+        int node2 = 2;
+        Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
+        LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
+        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+        // removing node1 should not decrement HW to 10L
+        Set<Integer> voterSetWithoutNode1 = mkSet(localId, node2);
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
voterSetWithoutNode1));
+        assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+        // HW cannot change until after node2 catches up to last HW
+        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(14L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+        // HW should update to 16L
+        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(16L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
+    }
+
+    @Test
+    public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() {
+        int node1 = 1;
+        int node2 = 2;
+        Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
+        LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
+        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+        assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+        // removing leader should not decrement HW to 10L
+        Set<Integer> voterSetWithoutLeader = mkSet(node1, node2);
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
voterSetWithoutLeader));
+        assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
+
+        // HW cannot change until node2 catches up to last HW

Review Comment:
   Before checking this, let's increase node1's LEO to 16 and show that it 
doesn't increase the HWM.



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -272,20 +280,77 @@ public void testUpdateHighWatermarkQuorumSizeThree() {
         assertEquals(Optional.empty(), state.highWatermark());
         assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(20L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), 
voterSet));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
         assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
         assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
     }
 
+    @Test
+    public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() {

Review Comment:
   Are you planning to add a test for adding a follower to the voter set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to