malletgu commented on code in PR #16959: URL: https://github.com/apache/kafka/pull/16959#discussion_r1727172794
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1254,6 +1255,13 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE canProcess = false; continue; } + + if (RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state())) { + // If the state is COPY_SEGMENT_STARTED and it's not under copying process, this must be the previously + // failed copied state. We should clean it up directly. + danglingSegments.add(metadata); + continue; Review Comment: I think here we have a risk of race condition if `segmentIdsBeingCopied` is updated during the for loop which could cause the segment initially being copied to be deleted by mistake. For the race condition to happen we would need to : - When the list of segments is created L1244 a segment is currently being copied and present in `segmentIdsBeingCopied` - During the for loop, the segment copy finishes and its id is removed from `segmentIdsBeingCopied` - L1259 `if (RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state()))` can now be reached with state `COPY_SEGMENT_STARTED` for that segment because it has been removed from `segmentIdsBeingCopied` which would make it dangling and cause its deletion. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1374,10 +1387,17 @@ private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize, Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); while (segmentsIterator.hasNext()) { RemoteLogSegmentMetadata segmentMetadata = segmentsIterator.next(); - RemoteLogSegmentId segmentId = segmentMetadata.remoteLogSegmentId(); - if (!visitedSegmentIds.contains(segmentId) && isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) { - remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes(); - visitedSegmentIds.add(segmentId); + // Only count the size of "COPY_SEGMENT_FINISHED" and "DELETE_SEGMENT_STARTED" state segments + // because "COPY_SEGMENT_STARED" means copy didn't complete, and "DELETE_SEGMENT_FINISHED" means delete completed. + // Note: there might be some "COPY_SEGMENT_STARED" segments not counted here, but become "COPY_SEGMENT_FINISHED" soon. + // It's fine because the missed segment size will be count in next time, and it won't cause more segment deletion. Review Comment: ```suggestion // Only count the size of "COPY_SEGMENT_FINISHED" and "DELETE_SEGMENT_STARTED" state segments // because "COPY_SEGMENT_STARTED" means copy didn't complete, and "DELETE_SEGMENT_FINISHED" means delete did complete. // Note: there might be some "COPY_SEGMENT_STARTED" segments not counted here. // Either they are being copied and will be counted next time or they are dangling and will be cleaned // elsewhere. // Either way, this won't cause more segment deletion. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org