kamalcph commented on code in PR #21213:
URL: https://github.com/apache/kafka/pull/21213#discussion_r2664263295


##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:
##########
@@ -1460,26 +1506,32 @@ private Optional<RetentionSizeData> 
buildRetentionSizeData(long retentionSize,
                         // "DELETE_SEGMENT_FINISHED" means deletion completed, 
so there is nothing to count.
                         if 
(segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
                             RemoteLogSegmentId segmentId = 
segmentMetadata.remoteLogSegmentId();
-                            if (!visitedSegmentIds.contains(segmentId) && 
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) 
{
-                                remoteLogSizeBytes += 
segmentMetadata.segmentSizeInBytes();
+                            if (!visitedSegmentIds.contains(segmentId)) {
                                 visitedSegmentIds.add(segmentId);
+                                boolean isValid = 
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries);
+                                if (isValid) {
+                                    remoteLogSizeBytes += 
segmentMetadata.segmentSizeInBytes();
+                                } else {
+                                    isAllValid = false;
+                                }
                             }
                         }
                     }
                 }
-
+                this.isAllSegmentsValid = isAllValid && 
fullCopyFinishedSegmentsSizeInBytes == remoteLogSizeBytes;
                 
brokerTopicStats.recordRemoteLogSizeComputationTime(topicIdPartition.topic(), 
topicIdPartition.partition(), time.milliseconds() - startTimeMs);
-
-                // This is the total size of segments in local log that have 
their base-offset > local-log-start-offset
-                // and size of the segments in remote storage which have their 
end-offset < local-log-start-offset.
-                long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
-                if (totalSize > retentionSize) {
-                    long remainingBreachedSize = totalSize - retentionSize;
-                    RetentionSizeData retentionSizeData = new 
RetentionSizeData(retentionSize, remainingBreachedSize);
-                    return Optional.of(retentionSizeData);
-                }
+            } else {
+                // Once all the segments are valid, then the future segments 
to be uploaded by this leader are also valid.
+                remoteLogSizeBytes = fullCopyFinishedSegmentsSizeInBytes;

Review Comment:
   Updated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to