kamalcph commented on code in PR #21158:
URL: https://github.com/apache/kafka/pull/21158#discussion_r2629330624
##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java:
##########
@@ -3632,6 +3632,73 @@ public void testTierLagResetsToZeroOnBecomingFollower() {
assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}
+ @Test
+ public void testCopyLagMetricsWithOnlyActiveSegment() {
+ LogSegment activeSegment = mock(LogSegment.class);
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(100L);
+ when(activeSegment.size()).thenReturn(100);
+ when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(1L);
+
+ remoteLogManager.onLeadershipChange(
+ Set.of(mockPartition(leaderTopicIdPartition)), Set.of(),
topicIds);
+ RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask)
remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
+ assertNotNull(rlmTask);
+
+ rlmTask.recordLagStats(mockLog);
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
+ }
+
+ @Test
+ public void testCopyLagMetricsWithMultipleSegments() {
+ LogSegment activeSegment = mock(LogSegment.class);
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(300L);
+ when(activeSegment.size()).thenReturn(100);
+ when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(3L);
+
+ remoteLogManager.onLeadershipChange(
+ Set.of(mockPartition(leaderTopicIdPartition)), Set.of(),
topicIds);
+ RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask)
remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
+ assertNotNull(rlmTask);
+
+ rlmTask.recordLagStats(mockLog);
+ assertEquals(200,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());
+ assertEquals(2,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
+ }
+
+ @Test
+ public void testCopyLagMetricsAfterLeaderChangeWithHigherRemoteOffset() {
+ // Edge case after leader change: active segment base offset is less
than highestOffsetInRemoteStorage
+ // This can happen when a new leader takes over and remote storage has
data uploaded by the previous leader
+ // onlyLocalLogSegmentsSize returns 0 because active segment doesn't
pass the filter (baseOffset < highestOffsetInRemoteStorage)
+ // Without Math.max this would be negative
+ LogSegment activeSegment = mock(LogSegment.class);
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.highestOffsetInRemoteStorage()).thenReturn(125L);
+ when(activeSegment.baseOffset()).thenReturn(100L);
+ when(mockLog.logEndOffset()).thenReturn(150L);
+ when(mockLog.lastStableOffset()).thenReturn(150L);
+ when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(0L);
+ when(activeSegment.size()).thenReturn(100);
+ when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(0L);
+
+ remoteLogManager.onLeadershipChange(
+ Set.of(mockPartition(leaderTopicIdPartition)), Set.of(),
topicIds);
+ RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask)
remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
+ assertNotNull(rlmTask);
+
+ rlmTask.recordLagStats(mockLog);
+
assertTrue(brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes()
>= 0);
+
assertTrue(brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments()
>= 0);
+ assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());
Review Comment:
do we need the assert in L3696 and L3697? value >= 0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]