malletgu commented on code in PR #16959:
URL: https://github.com/apache/kafka/pull/16959#discussion_r1727172794


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1254,6 +1255,13 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
                         canProcess = false;
                         continue;
                     }
+
+                    if 
(RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state())) {
+                        // If the state is COPY_SEGMENT_STARTED and it's not 
under copying process, this must be the previously
+                        // failed copied state. We should clean it up directly.
+                        danglingSegments.add(metadata);
+                        continue;

Review Comment:
   I think here we have a risk of race condition if `segmentIdsBeingCopied` is 
updated during the for loop which could cause the segment initially being 
copied to be deleted by mistake.
   For the race condition to happen we would need to :
   - When the list of segments is created L1244 a segment is currently being 
copied and present in `segmentIdsBeingCopied` 
   - During the for loop, the segment copy finishes and its id is removed from 
`segmentIdsBeingCopied`
   - L1259 `if 
(RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state()))` can now 
be reached with state `COPY_SEGMENT_STARTED` for that segment because it has 
been removed from `segmentIdsBeingCopied` which would make it dangling and 
cause its deletion.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1374,10 +1387,17 @@ private Optional<RetentionSizeData> 
buildRetentionSizeData(long retentionSize,
                     Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
                     while (segmentsIterator.hasNext()) {
                         RemoteLogSegmentMetadata segmentMetadata = 
segmentsIterator.next();
-                        RemoteLogSegmentId segmentId = 
segmentMetadata.remoteLogSegmentId();
-                        if (!visitedSegmentIds.contains(segmentId) && 
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) 
{
-                            remoteLogSizeBytes += 
segmentMetadata.segmentSizeInBytes();
-                            visitedSegmentIds.add(segmentId);
+                        // Only count the size of "COPY_SEGMENT_FINISHED" and 
"DELETE_SEGMENT_STARTED" state segments
+                        // because "COPY_SEGMENT_STARED" means copy didn't 
complete, and "DELETE_SEGMENT_FINISHED" means delete completed.
+                        // Note: there might be some "COPY_SEGMENT_STARED" 
segments not counted here, but become "COPY_SEGMENT_FINISHED" soon.
+                        // It's fine because the missed segment size will be 
count in next time, and it won't cause more segment deletion.

Review Comment:
   ```suggestion
                           // Only count the size of "COPY_SEGMENT_FINISHED" 
and "DELETE_SEGMENT_STARTED" state segments
                           // because "COPY_SEGMENT_STARTED" means copy didn't 
complete, and "DELETE_SEGMENT_FINISHED" means delete did complete.
                           // Note: there might be some "COPY_SEGMENT_STARTED" 
segments not counted here.
                           // Either they are being copied and will be counted 
next time or they are dangling and will be cleaned 
                           // elsewhere.
                           // Either way, this won't cause more segment 
deletion.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to