showuon commented on code in PR #14349:
URL: https://github.com/apache/kafka/pull/14349#discussion_r1320504527


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1004,19 +999,25 @@ private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, Ex
                     // remote log segments won't be removed. The 
`isRemoteSegmentWithinLeaderEpoch` validates whether
                     // the epochs present in the segment lies in the 
checkpoint file. It will always return false
                     // since the checkpoint file was already truncated.
-                    boolean isSegmentDeleted = 
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
+                    boolean shouldDeleteSegment = 
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
                             metadata, logStartOffset, epochWithOffsets);
+                    if (shouldDeleteSegment) {
+                        segmentsToDelete.add(metadata);
+                    }
                     boolean isValidSegment = false;
-                    if (!isSegmentDeleted) {
+                    if (!shouldDeleteSegment) {
                         // check whether the segment contains the required 
epoch range with in the current leader epoch lineage.
                         isValidSegment = 
isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
                         if (isValidSegment) {
-                            isSegmentDeleted =
+                            shouldDeleteSegment =
                                     
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
                                             
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+                            if (shouldDeleteSegment) {
+                                segmentsToDelete.add(metadata);
+                            }

Review Comment:
   nit: These if block and L1004-1006 could be put before `canProcess = 
isSegmentDeleted || !isValidSegment;`. I.e.
   ```
   boolean shouldDeleteSegment = 
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
                               metadata, logStartOffset, epochWithOffsets);
   boolean isValidSegment = false;
   
   if (!shouldDeleteSegment) {
       ...
     if (isValidSegment) {
         shouldDeleteSegment = 
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || 
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
     }
   }
   if (shouldDeleteSegment) {
      segmentsToDelete.add(metadata);
   }
   
   canProcess = isSegmentDeleted || !isValidSegment;
   ```
   



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1508,16 +1509,184 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
         }
     }
 
+    @Test
+    public void testDeleteRetentionSizeBreachingSegments() throws 
RemoteStorageException, IOException {
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) ->  logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+            task.convertToLeader(0);
+
+            
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+            when(mockLog.logEndOffset()).thenReturn(200L);
+
+            List<EpochEntry> epochEntries = 
Collections.singletonList(epochEntry0);
+
+            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, 
epochEntries);
+
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+            checkpoint.write(epochEntries);
+            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, 
checkpoint);
+            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+            Map<String, Long> logProps = new HashMap<>();
+            logProps.put("retention.bytes", 0L);
+            logProps.put("retention.ms", -1L);
+            LogConfig mockLogConfig = new LogConfig(logProps);
+            when(mockLog.config()).thenReturn(mockLogConfig);
+
+            
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { 
}));
+
+            task.run();
+
+            assertEquals(200L, logStartOffset.get());
+            
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+            
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+        }
+    }
+
+    @Test
+    public void testDeleteRetentionMsBreachingSegments() throws 
RemoteStorageException, IOException {
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) ->  logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+            task.convertToLeader(0);
+
+            
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+            when(mockLog.logEndOffset()).thenReturn(200L);
+
+            List<EpochEntry> epochEntries = 
Collections.singletonList(epochEntry0);
+
+            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, 
epochEntries);
+
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+            checkpoint.write(epochEntries);
+            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, 
checkpoint);
+            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+            Map<String, Long> logProps = new HashMap<>();
+            logProps.put("retention.bytes", -1L);
+            logProps.put("retention.ms", 0L);
+            LogConfig mockLogConfig = new LogConfig(logProps);
+            when(mockLog.config()).thenReturn(mockLogConfig);
+
+            
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> CompletableFuture.runAsync(() -> { 
}));
+
+            task.run();
+
+            assertEquals(200L, logStartOffset.get());
+            
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+            
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+        }
+    }
+
+    @Test
+    public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws 
RemoteStorageException, IOException {
+        AtomicLong logStartOffset = new AtomicLong(0);
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) ->  logStartOffset.set(offset),
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+            RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+            task.convertToLeader(0);
+
+            
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+            when(mockLog.logEndOffset()).thenReturn(200L);
+
+            List<EpochEntry> epochEntries = 
Collections.singletonList(epochEntry0);
+
+            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, 
epochEntries);
+
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+            
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                    .thenReturn(remoteLogSegmentMetadatas.iterator())
+                    .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+            checkpoint.write(epochEntries);
+            LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, 
checkpoint);
+            when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+            Map<String, Long> logProps = new HashMap<>();
+            logProps.put("retention.bytes", -1L);
+            logProps.put("retention.ms", 0L);
+            LogConfig mockLogConfig = new LogConfig(logProps);
+            when(mockLog.config()).thenReturn(mockLogConfig);
+
+            
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                    .thenAnswer(answer -> {
+                        // cancel the task so that we don't delete the second 
segment
+                        task.cancel();
+                        return CompletableFuture.runAsync(() -> {
+                        });
+                    });
+
+            task.run();
+
+            assertEquals(200L, logStartOffset.get());
+            
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+            verify(remoteStorageManager, 
never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));

Review Comment:
   I think this is a good place to continue to verify the situation we want to 
protect:
   ```
   // If the follower HAS picked up the changes, and they become the leader 
this replica won't successfully complete the deletion.
   // However, the new leader will correctly pick up all breaching segments as 
log start offset breaching ones
   // and delete them accordingly.
   
   // If the follower HAS NOT picked up the changes, and they become the leader 
then they will go through this process
   // again and delete them with the original deletion reason i.e. size, time 
or log start offset breach.
   ```
   
   So, I'm thinking we can continue the test with sth like:
   ```
   RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(followerTopicIdPartition, 128);
   task.convertToLeader(1);
   ....
   task.run();
   
   assertEquals(200L, logStartOffset.get());
   
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to