divijvaidya commented on code in PR #14349: URL: https://github.com/apache/kafka/pull/14349#discussion_r1318506043
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -818,56 +818,51 @@ public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); } - private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) { if (!retentionSizeData.isPresent()) { return false; } - boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { - // Assumption that segments contain size >= 0 - if (remainingBreachedSize > 0) { - long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); - if (remainingBytes >= 0) { - remainingBreachedSize = remainingBytes; - return true; - } + boolean isSegmentDeleted = false; + // Assumption that segments contain size >= 0 + if (remainingBreachedSize > 0) { + long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes(); + if (remainingBytes >= 0) { + remainingBreachedSize = remainingBytes; + isSegmentDeleted = true; } + } - return false; - }); if (isSegmentDeleted) { logStartOffset = OptionalLong.of(metadata.endOffset() + 1); - logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); } return isSegmentDeleted; } - public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) - throws RemoteStorageException, ExecutionException, InterruptedException { + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) { if (!retentionTimeData.isPresent()) { return false; } - boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, - x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + boolean isSegmentDeleted = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs; Review Comment: should this boolean be renamed to "shouldDeleteSegment" ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1006,6 +1005,16 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex // Update log start offset with the computed value after retention cleanup is done remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset)); + + List<String> undeletedSegments = new ArrayList<>(); + for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) { + if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) { + undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString()); + } + } + if (!undeletedSegments.isEmpty()) { + logger.error("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments)); Review Comment: not an error since it's recoverable and expected when leadership has transitioned over. Perhaps a info. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1006,6 +1005,16 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex // Update log start offset with the computed value after retention cleanup is done remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset)); Review Comment: we might have a situation where we don't delete the segments because we are not the leader anymore but we still update the log start offset. It is ok to do that. Please add a comment about it here. For example: 1. we update log start offset 2. replica fetch takes place and completes. 3. replica updates its log start offset based on our new log start offset 4. that replica becomes new leader 5. we skip deleting the files here because we are not the leader anymore 6. the new leader will still delete these files but it will delete them under the delete everything below log start offset case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org