hachikuji commented on code in PR #12508: URL: https://github.com/apache/kafka/pull/12508#discussion_r948237284
########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -119,6 +122,93 @@ public void testNonMonotonicLocalEndOffsetUpdate() { () -> state.updateLocalState(0, new LogOffsetMetadata(15L))); } + @Test + public void testLastCaughtUpTimeVoters() { + int node1 = 1; + int node2 = 2; + int currentTime = 1000; + int fetchTime = 0; + int caughtupTime = -1; + LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 10L); + assertEquals(Optional.empty(), state.highWatermark()); + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); + assertEquals(Optional.empty(), state.highWatermark()); + + // Node 1 falls behind + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L), 11L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node 1 catches up to leader + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L), 11L)); + caughtupTime = fetchTime; + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node 1 falls behind + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L), 100L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node 1 catches up to the last fetch offset + int prevFetchTime = fetchTime; + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L), 200L)); + caughtupTime = prevFetchTime; + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node2 has never caught up to leader + assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(), -1L); + assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(202L), 300L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(), -1L); + assertFalse(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L), 300L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(), -1L); + } + + @Test + public void testLastCaughtUpTimeObserver() { + int node1Index = 0; + int node1Id = 1; + int currentTime = 1000; + int fetchTime = 0; + int caughtupTime = -1; Review Comment: nit: `caughtUpTime`? ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -224,7 +314,9 @@ public void testGetVoterStates() { mkEntry(localId, leaderEndOffset), mkEntry(node1, leaderStartOffset), mkEntry(node2, leaderEndOffset) - ), state.getVoterEndOffsets()); + ), state.quorumResponseVoterStates(0) + .stream() Review Comment: nit: indentation looks kind of strange here. Probably should be 4 spaces? Also below on lines 344 and 368. ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -188,13 +278,13 @@ public void testNonMonotonicHighWatermarkUpdate() { int node1 = 1; LeaderState<?> state = newLeaderState(mkSet(localId, node1), 0L); state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(10L)); - state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L)); + state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L), 11L); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); // Follower crashes and disk is lost. It fetches an earlier offset to rebuild state. // The leader will report an error in the logs, but will not let the high watermark rewind - assertFalse(state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L))); - assertEquals(5L, state.getVoterEndOffsets().get(node1)); + assertFalse(state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L), 6L)); Review Comment: Why does the leader end offset go backwards here? Previously it was at 11. ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -119,6 +122,93 @@ public void testNonMonotonicLocalEndOffsetUpdate() { () -> state.updateLocalState(0, new LogOffsetMetadata(15L))); } + @Test + public void testLastCaughtUpTimeVoters() { + int node1 = 1; + int node2 = 2; + int currentTime = 1000; + int fetchTime = 0; + int caughtupTime = -1; + LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 10L); + assertEquals(Optional.empty(), state.highWatermark()); + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); + assertEquals(Optional.empty(), state.highWatermark()); + + // Node 1 falls behind + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L), 11L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node 1 catches up to leader + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L), 11L)); + caughtupTime = fetchTime; + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node 1 falls behind + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L), 100L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node 1 catches up to the last fetch offset + int prevFetchTime = fetchTime; + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L), 200L)); + caughtupTime = prevFetchTime; + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(), caughtupTime); + + // Node2 has never caught up to leader + assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(), -1L); + assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(202L), 300L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(), -1L); + assertFalse(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L), 300L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(), -1L); + } + + @Test + public void testLastCaughtUpTimeObserver() { + int node1Index = 0; + int node1Id = 1; + int currentTime = 1000; + int fetchTime = 0; + int caughtupTime = -1; + LeaderState<?> state = newLeaderState(singleton(localId), 5L); + assertEquals(Optional.empty(), state.highWatermark()); + assertEquals(emptySet(), state.nonAcknowledgingVoters()); + + // Node 1 falls behind + assertTrue(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(10L), 11L)); + assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(), currentTime); Review Comment: The expected value should always be the first argument. A few of these in this test class. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -217,20 +219,36 @@ public boolean updateLocalState(long fetchTimestamp, LogOffsetMetadata logOffset * @param replicaId replica id * @param fetchTimestamp fetch timestamp * @param logOffsetMetadata new log offset and metadata + * @param leaderLogEndOffset current log end offset of the leader * @return true if the high watermark is updated too */ - public boolean updateReplicaState(int replicaId, - long fetchTimestamp, - LogOffsetMetadata logOffsetMetadata) { + public boolean updateReplicaState( + int replicaId, + long fetchTimestamp, + LogOffsetMetadata logOffsetMetadata, + long leaderLogEndOffset + ) { // Ignore fetches from negative replica id, as it indicates // the fetch is from non-replica. For example, a consumer. if (replicaId < 0) { return false; } ReplicaState state = getReplicaState(replicaId); - state.updateFetchTimestamp(fetchTimestamp); + + // Only proceed with updating the states if the offset update is valid + verifyEndOffsetUpdate(state, logOffsetMetadata); + + // Update the Last CaughtUp Time + if (logOffsetMetadata.offset >= leaderLogEndOffset) { + state.updateLastCaughtUpTimestamp(fetchTimestamp); + } else if (logOffsetMetadata.offset >= state.lastFetchLeaderLogEndOffset.orElse(-1L)) { + state.updateLastCaughtUpTimestamp(state.lastFetchTimestamp.orElse(-1L)); + } + + state.updateFetchTimestamp(fetchTimestamp, leaderLogEndOffset); return updateEndOffset(state, logOffsetMetadata); + Review Comment: nit: unneeded newline ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -246,20 +264,26 @@ private List<ReplicaState> followersByDescendingFetchOffset() { .collect(Collectors.toList()); } - private boolean updateEndOffset(ReplicaState state, - LogOffsetMetadata endOffsetMetadata) { + private void verifyEndOffsetUpdate( + ReplicaState state, + LogOffsetMetadata endOffsetMetadata + ) { state.endOffset.ifPresent(currentEndOffset -> { if (currentEndOffset.offset > endOffsetMetadata.offset) { if (state.nodeId == localId) { throw new IllegalStateException("Detected non-monotonic update of local " + - "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); + "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); Review Comment: nit: this indentation should be reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org