kamalcph commented on code in PR #16959:
URL: https://github.com/apache/kafka/pull/16959#discussion_r1729103793


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2338,6 +2322,122 @@ public void 
testRemoteDeleteLagsOnRetentionBreachedSegments(long retentionSize,
         verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
     }
 
+    @Test
+    public void 
testRemoteLogSizeRetentionShouldFilterOutCopySegmentStartState()
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
+        int segmentSize = 1024;
+        Map<String, Long> logProps = new HashMap<>();
+        // start with disabling retention.ms/bytes
+        logProps.put("retention.bytes", -1L);
+        logProps.put("retention.ms", -1L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(2000L);
+
+        // creating remote log metadata list:
+        // s1. One segment with "COPY_SEGMENT_STARTED" state to simulate the 
segment was failing on copying to remote storage.
+        //     In the 1st run, this dangling segment should be deleted even 
though retention.ms/bytes are disabled (-1).
+        // s2. Another segment with "COPY_SEGMENT_STARTED" state to simulate 
the segment is copying to remote storage.
+        //     The segment state will change to "COPY_SEGMENT_FINISHED" state 
before checking deletion.
+        //     In the 1st run, this segment should not be deleted because it 
has changed to "COPY_SEGMENT_FINISHED".
+        //     In the 2nd run, we should count it in when calculating remote 
storage log size.
+        // s3. One segment with "DELETE_SEGMENT_FINISHED" state to simulate 
the remoteLogMetadataManager doesn't filter it out and returned.
+        //     We should filter it out when calculating remote storage log 
size and deletion
+        // s4. One segment with "DELETE_SEGMENT_STARTED" state to simulate the 
segment was failing on deleting remote log.
+        //     We should count it in when calculating remote storage log size.
+        // s5. 11 segments with "COPY_SEGMENT_FINISHED" state. These are 
expected to be counted in when calculating remote storage log size
+        //
+        // In the 1st run, because retention.ms/bytes are disabled (-1), only 
s1 will be deleted.
+        // In the 2nd run, the total remote storage size should be 1024 * 13 
(s2, s4, s5), and the retention size is 10240,
+        // so 3 segments will be deleted due to retention size breached.
+        RemoteLogSegmentMetadata s1 = 
createRemoteLogSegmentMetadata(leaderTopicIdPartition,
+                0, 99, segmentSize, epochEntries, 
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+        RemoteLogSegmentMetadata s2CopyStarted = 
createRemoteLogSegmentMetadata(leaderTopicIdPartition,
+                200, 299, segmentSize, epochEntries, 
RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+        RemoteLogSegmentMetadata s3 = 
createRemoteLogSegmentMetadata(leaderTopicIdPartition,
+                0, 99, segmentSize, epochEntries, 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
+        RemoteLogSegmentMetadata s4 = 
createRemoteLogSegmentMetadata(leaderTopicIdPartition,
+                0, 99, segmentSize, epochEntries, 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+        List<RemoteLogSegmentMetadata> s5 =
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 11, 100, 
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        List<RemoteLogSegmentMetadata> metadataList = new LinkedList<>();
+        metadataList.addAll(Arrays.asList(s1, s2CopyStarted, s3, s4));
+        metadataList.addAll(s5);
+
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> metadataList.iterator());
+        
when(remoteLogMetadataManager.remoteLogSegmentMetadata(leaderTopicIdPartition, 
0, 0))
+                .thenReturn(Optional.of(metadataList.get(0)));
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+        // returning "COPY_SEGMENT_FINISHED" state when checking s2 during 
deletion, to simulate the segment has completed copy process.
+        // We should not treat it as dangling segment.
+        RemoteLogSegmentMetadata s2CopyFinished = 
createRemoteLogSegmentMetadata(leaderTopicIdPartition,

Review Comment:
   nit: can we keep the same segmentId for `s2CopyFinished` and 
`s2CopyStarted`? This will avoid any test flakiness in future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to