divijvaidya commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1224420683
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + return; + } + + final long retentionSize = log.config().retentionSize; + final boolean checkSizeRetention = retentionSize > -1; + + final long retentionMs = log.config().retentionMs; + final boolean checkTimestampRetention = retentionMs > -1; + + // Iterate once + // - to build the log size of segments with base-offset < local-log-start-offset + // - to collect all the epochs of remote log segments + // These values can be cached and updated in RLMTask for this topic partition without computing in each + // iteration. But the logic can become little complex and need to cover different scenarios to avoid any + // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion. + final Set<Integer> epochsSet = new HashSet<>(); + long totalSizeEarlierToLocalLogStartOffset = 0L; + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + + if (checkSizeRetention && segmentMetadata.endOffset() < log.localLogStartOffset()) { + totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes(); + } + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(checkSizeRetention, retentionSize, log, totalSizeEarlierToLocalLogStartOffset); + Optional<RetentionTimeData> retentionTimeData = checkTimestampRetention + ? Optional.of(new RetentionTimeData(retentionMs, time.milliseconds() - retentionMs)) + : Optional.empty(); + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochEntryIterator.hasNext()) { + EpochEntry epochEntry = epochEntryIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + if (isCancelled() || !isLeader()) { + return; Review Comment: please add an info log on why we exited the function prior to it's completion. It greatly helps debugging when we don't have to guess where the return point was. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { Review Comment: nit unnecessary else ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -945,4 +1176,27 @@ public void close() { } } + private static class RetentionSizeData { + private final long retentionSize; + private final long remainingBreachedSize; + + public RetentionSizeData(long retentionSize, long remainingBreachedSize) { + this.retentionSize = retentionSize; Review Comment: please perform an argument validation here. If retentionSize < remainingBreachedSize, then IllegalArgumentException. Same for RetentionTimeData ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; Review Comment: please add a debug log here (and other places where we are exiting this function) so that we know while debugging where did we exit the function from. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + return; + } + + final long retentionSize = log.config().retentionSize; + final boolean checkSizeRetention = retentionSize > -1; + + final long retentionMs = log.config().retentionMs; + final boolean checkTimestampRetention = retentionMs > -1; + + // Iterate once + // - to build the log size of segments with base-offset < local-log-start-offset + // - to collect all the epochs of remote log segments + // These values can be cached and updated in RLMTask for this topic partition without computing in each + // iteration. But the logic can become little complex and need to cover different scenarios to avoid any + // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion. + final Set<Integer> epochsSet = new HashSet<>(); + long totalSizeEarlierToLocalLogStartOffset = 0L; + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + + if (checkSizeRetention && segmentMetadata.endOffset() < log.localLogStartOffset()) { + totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes(); + } + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(checkSizeRetention, retentionSize, log, totalSizeEarlierToLocalLogStartOffset); + Optional<RetentionTimeData> retentionTimeData = checkTimestampRetention + ? Optional.of(new RetentionTimeData(retentionMs, time.milliseconds() - retentionMs)) + : Optional.empty(); + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochEntryIterator.hasNext()) { + EpochEntry epochEntry = epochEntryIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + if (isCancelled() || !isLeader()) { + return; + } + + RemoteLogSegmentMetadata metadata = segmentsIterator.next(); + isSegmentDeleted = + remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) || + remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, log.logStartOffset()); + } + } + + // Remove the remote log segments whose segment-leader-epochs are lesser than the earliest-epoch known + // to the leader. This will remove the unreferenced segments in the remote storage. + Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry(); + if (earliestEpochEntryOptional.isPresent()) { + EpochEntry earliestEpochEntry = earliestEpochEntryOptional.get(); + Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream().filter(x -> x < earliestEpochEntry.epoch).iterator(); + while (epochsToClean.hasNext()) { + int epoch = epochsToClean.next(); + Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + while (segmentsToBeCleaned.hasNext()) { + if (isCancelled() || !isLeader()) { + return; + } + // No need to update the log-start-offset even though the segment is deleted as these epochs/offsets are earlier to that value. + remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry, segmentsToBeCleaned.next()); + } + } + } + + // Update log start offset with the computed value after retention cleanup is done + remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset)); + } + + private Optional<RetentionSizeData> buildRetentionSizeData(boolean checkSizeRetention, + long retentionSize, + UnifiedLog log, + long totalSizeEarlierToLocalLogStartOffset) { + if (checkSizeRetention) { + // This is the total size of segments in local log that have their base-offset > local-log-start-offset + // and size of the segments in remote storage which have their end-offset < local-log-start-offset. + long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset; + long remainingBreachedSize = totalSize - retentionSize; Review Comment: please add the following check. We don't want to construct an object for RetentionSizeData if not required. ``` if (totalSize > retentionSize) { long breachedSize = totalSize - retentionSize return Optional.of(new RetentionSizeData(retentionSize, remainingBreachedSize)); } ``` ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -2144,7 +2208,7 @@ case class RetentionSizeBreach(log: UnifiedLog) extends SegmentDeletionReason { var size = log.size toDelete.foreach { segment => size -= segment.size - log.info(s"Deleting segment $segment due to retention size ${log.config.retentionSize} breach. Log size " + + log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config)} breach. Log size " + Review Comment: nit s/Log size after deletion/Local log size after deletion Asking so that the reader can disambiguate between Log size (which is tiered + local) and local log size. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; Review Comment: we need to restore the original value of remainingBreachedSize when remainingBreachedSize < 0? May I suggest re-writing this entire predicate here as: ``` long segmentSize = metadata.segmentSizeInBytes() remainingBreachedSize -= segmentSize if (remainingBreachedSize < 0) { remainingBreachedSize += segmentSize return false } return true ``` Note that remainingBreachedSize is a member of the class and you don't need to do `retentionSizeData.get().remainingBreachedSize`. Also the earlier `if (retentionSizeData.get().remainingBreachedSize > 0) {` is made redundant by the code I suggested. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + return; + } + + final long retentionSize = log.config().retentionSize; + final boolean checkSizeRetention = retentionSize > -1; + + final long retentionMs = log.config().retentionMs; + final boolean checkTimestampRetention = retentionMs > -1; + + // Iterate once + // - to build the log size of segments with base-offset < local-log-start-offset + // - to collect all the epochs of remote log segments + // These values can be cached and updated in RLMTask for this topic partition without computing in each + // iteration. But the logic can become little complex and need to cover different scenarios to avoid any + // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion. + final Set<Integer> epochsSet = new HashSet<>(); + long totalSizeEarlierToLocalLogStartOffset = 0L; + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + + if (checkSizeRetention && segmentMetadata.endOffset() < log.localLogStartOffset()) { Review Comment: We are assuming that the state of local log will remain same from this point to the time we use the information computed here (i.e. totalSizeEarlierToLocalLogStartOffset ) to delete the segments. But that is not true since local retention threads are running concurrently and might have moved the localLogStartOffset by the time we use the `totalSizeEarlierToLocalLogStartOffset` computed here. As an example: ### Time instant: T1 LocalLSO = 10 LSO = 0 LSE = 20 TieredEO = 15 In this case we will calculate `totalSizeEarlierToLocalLogStartOffset` as the size from 0-10. ### Time instant: T2 Local log retention thread deletes some stuff and updates the LocalLSO=14 ### Time instant: T3 When we calculate `long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset;` at `buildRetentionSizeData`, validLocalLogSegmentsSize returns data from 14-20 and we say that the total size = totalSizeEarlierToLocalLogStartOffset ( i.e. 0-10) + validLocalLogSegmentsSize (i.e. 14-20). This leads to data from 11-13 not being counted anywhere. This looks like a bug! We need to re-use the values stores at the beginning of the retention calculation otherwise other threads (local retention threads) may change the values behind the scenes. Thoughts? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { Review Comment: even if we are not the leader at this stage, we have deleted the logs in remote. Shouldn't we still update the metadata? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) + updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) + if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() maybeIncrementFirstUnstableOffset() initializeTopicId() logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset) + + info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " + + s"log end offset $logEndOffset") } def setLogOffsetsListener(listener: LogOffsetsListener): Unit = { logOffsetsListener = listener } + private def updateLocalLogStartOffset(offset: Long): Unit = { + _localLogStartOffset = offset + + if (highWatermark < offset) { + updateHighWatermark(offset) + } + + if (this.recoveryPoint < offset) { + localLog.updateRecoveryPoint(offset) + } + } + + def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = { + if (!remoteLogEnabled()) { + info("Ignoring the call as the remote log storage is disabled") Review Comment: this should probably be a error level log because we don't expect to call this method when remote storage is disabled. Isn't that right? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -618,6 +625,230 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + if (isLeader()) { + logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + } + + class RemoteLogRetentionHandler { + + private final Optional<RetentionSizeData> retentionSizeData; + private final Optional<RetentionTimeData> retentionTimeData; + + private long remainingBreachedSize; + + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) { + this.retentionSizeData = retentionSizeData; + this.retentionTimeData = retentionTimeData; + remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionSizeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size >= 0 + if (retentionSizeData.get().remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", + metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); + } + return isSegmentDeleted; + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (!retentionTimeData.isPresent()) { + return false; + } + + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", + metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted && retentionSizeData.isPresent()) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // It removes the segments beyond the current leader's earliest epoch. Those segments are considered as + // unreferenced because they are not part of the current leader epoch lineage. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled() || !isLeader()) { + logger.info("Returning from remote log segments cleanup as the task state is changed"); + return; + } + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + return; + } + + final long retentionSize = log.config().retentionSize; + final boolean checkSizeRetention = retentionSize > -1; + + final long retentionMs = log.config().retentionMs; + final boolean checkTimestampRetention = retentionMs > -1; + + // Iterate once + // - to build the log size of segments with base-offset < local-log-start-offset + // - to collect all the epochs of remote log segments + // These values can be cached and updated in RLMTask for this topic partition without computing in each + // iteration. But the logic can become little complex and need to cover different scenarios to avoid any + // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion. + final Set<Integer> epochsSet = new HashSet<>(); + long totalSizeEarlierToLocalLogStartOffset = 0L; + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + + if (checkSizeRetention && segmentMetadata.endOffset() < log.localLogStartOffset()) { + totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes(); + } + } + + // All the leader epochs in sorted order that exists in remote storage + final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(checkSizeRetention, retentionSize, log, totalSizeEarlierToLocalLogStartOffset); + Optional<RetentionTimeData> retentionTimeData = checkTimestampRetention + ? Optional.of(new RetentionTimeData(retentionMs, time.milliseconds() - retentionMs)) + : Optional.empty(); + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); + + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochEntryIterator.hasNext()) { + EpochEntry epochEntry = epochEntryIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch); Review Comment: Sorry, I am a bit confused here. Earlier in the comment https://github.com/apache/kafka/pull/13561#discussion_r1181526976 you mentioned that retention size/time configuration applies across all epochs. i.e. if I say retention is 3GB and the total log as per current epoch is 2 GB, but the total data stored in remote +local = 7GB, then I will delete (7-3) = 4GB of data as part of this cleanup. Is my understanding correct? If yes, then we seem to be deleting only the current leadership chain here BUT we are using the breached size from ALL the epochs calculated earlier. Isn't this contradictory? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -595,6 +609,193 @@ public void run() { } } + public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { + logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); + updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); + } + + class RemoteLogRetentionHandler { + + private long remainingBreachedSize = 0L; + private OptionalLong logStartOffset = OptionalLong.empty(); + + public RemoteLogRetentionHandler(long remainingBreachedSize) { + this.remainingBreachedSize = remainingBreachedSize; + } + + private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { + // Assumption that segments contain size > 0 + if (checkSizeRetention && remainingBreachedSize > 0) { + remainingBreachedSize -= x.segmentSizeInBytes(); + return remainingBreachedSize >= 0; + } else return false; + }); + if (isSegmentDeleted) { + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + + "${log.config.retentionSize} breach. Log size after deletion will be " + + "${remainingBreachedSize + log.config.retentionSize}."); + } + return isSegmentDeleted; + } + + private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); + } + + // No need to update the logStartOffset. + return isSegmentDeleted; + } + + // There are two cases: + // 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the + // leader-epoch gets bumped but the log-start-offset gets truncated back to 0. + // 2) To remove the unreferenced segments. + private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); + if (isSegmentDeleted) { + logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", + metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); + } + + // No need to update the log-start-offset as these epochs/offsets are earlier to that value. + return isSegmentDeleted; + } + + private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) + throws RemoteStorageException, ExecutionException, InterruptedException { + if (predicate.test(segmentMetadata)) { + // Publish delete segment started event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + + // Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + + // Publish delete segment finished event. + remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); + return true; + } else { + return false; + } + } + + public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata, + boolean checkTimestampRetention, + long cleanupTs, + long retentionMs) + throws RemoteStorageException, ExecutionException, InterruptedException { + boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, + x -> checkTimestampRetention && x.maxTimestampMs() <= cleanupTs); + if (isSegmentDeleted) { + remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); + // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals + // are ascending with in an epoch. + logStartOffset = OptionalLong.of(metadata.endOffset() + 1); + logger.info("Deleted remote log segment {} due to retention time " + + "{}ms breach based on the largest record timestamp in the segment", metadata.remoteLogSegmentId(), retentionMs); + } + return isSegmentDeleted; + } + } + + private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + if (isCancelled()) + return; + + // Cleanup remote log segments and update the log start offset if applicable. + final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + if (!segmentMetadataIter.hasNext()) { + return; + } + + final Optional<UnifiedLog> logOptional = fetchLog.apply(topicIdPartition.topicPartition()); + if (!logOptional.isPresent()) { + return; + } + + final UnifiedLog log = logOptional.get(); + final Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache(); + if (leaderEpochCacheOption.isEmpty()) { + return; + } + + final long retentionMs = log.config().retentionMs; + + // Iterate once + // - to build the log size of segments with base-offset < local-log-start-offset + // - to collect all the epochs of remote log segments + // These values can be cached and updated in RLMTask for this topic partition without computing in each + // iteration. But the logic can become little complex and need to cover different scenarios to avoid any + // leaks. We can have a followup to improve it by maintaining these values through both copying and deletion. + final Set<Integer> epochsSet = new HashSet<>(); + long totalSizeEarlierToLocalLogStartOffset = 0L; + while (segmentMetadataIter.hasNext()) { + RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); + epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); + + if (segmentMetadata.endOffset() < log.localLogStartOffset()) { + totalSizeEarlierToLocalLogStartOffset += segmentMetadata.segmentSizeInBytes(); + } + } + + // This is the total size of segments in local log that have their base-offset > local-log-start-offset + // and size of the segments in remote storage which have their end-offset < local-log-start-offset. + long totalSize = log.validLocalLogSegmentsSize() + totalSizeEarlierToLocalLogStartOffset; + + // All the leader epochs in sorted order that exists in remote storage + List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet); + Collections.sort(remoteLeaderEpochs); + + long remainingBreachedSize = totalSize - log.config().retentionSize; + + RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(remainingBreachedSize); + + boolean checkTimestampRetention = retentionMs > -1; + long cleanupTs = time.milliseconds() - retentionMs; + boolean checkSizeRetention = log.config().retentionSize > -1; + LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get(); + Iterator<EpochEntry> epochEntryIterator = leaderEpochCache.epochEntries().iterator(); + boolean isSegmentDeleted = true; + while (isSegmentDeleted && epochEntryIterator.hasNext()) { + EpochEntry epochEntry = epochEntryIterator.next(); + Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epochEntry.epoch); + while (isSegmentDeleted && segmentsIterator.hasNext()) { + RemoteLogSegmentMetadata metadata = segmentsIterator.next(); + isSegmentDeleted = Review Comment: This comment has been addressed in the latest code ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) + updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) + if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() maybeIncrementFirstUnstableOffset() initializeTopicId() logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset) + + info(s"Completed load of log with ${localLog.segments.numberOfSegments} segments, local log start offset ${localLogStartOffset()} and " + + s"log end offset $logEndOffset") } def setLogOffsetsListener(listener: LogOffsetsListener): Unit = { logOffsetsListener = listener } + private def updateLocalLogStartOffset(offset: Long): Unit = { Review Comment: should we ensure that we have acquired the partition `lock` first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org