showuon commented on code in PR #21213:
URL: https://github.com/apache/kafka/pull/21213#discussion_r2663871506
##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:
##########
@@ -1460,26 +1506,32 @@ private Optional<RetentionSizeData>
buildRetentionSizeData(long retentionSize,
// "DELETE_SEGMENT_FINISHED" means deletion completed,
so there is nothing to count.
if
(segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
RemoteLogSegmentId segmentId =
segmentMetadata.remoteLogSegmentId();
- if (!visitedSegmentIds.contains(segmentId) &&
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries))
{
- remoteLogSizeBytes +=
segmentMetadata.segmentSizeInBytes();
+ if (!visitedSegmentIds.contains(segmentId)) {
visitedSegmentIds.add(segmentId);
+ boolean isValid =
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries);
+ if (isValid) {
+ remoteLogSizeBytes +=
segmentMetadata.segmentSizeInBytes();
+ } else {
+ isAllValid = false;
+ }
}
}
}
}
-
+ this.isAllSegmentsValid = isAllValid &&
fullCopyFinishedSegmentsSizeInBytes == remoteLogSizeBytes;
brokerTopicStats.recordRemoteLogSizeComputationTime(topicIdPartition.topic(),
topicIdPartition.partition(), time.milliseconds() - startTimeMs);
-
- // This is the total size of segments in local log that have
their base-offset > local-log-start-offset
- // and size of the segments in remote storage which have their
end-offset < local-log-start-offset.
- long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
- if (totalSize > retentionSize) {
- long remainingBreachedSize = totalSize - retentionSize;
- RetentionSizeData retentionSizeData = new
RetentionSizeData(retentionSize, remainingBreachedSize);
- return Optional.of(retentionSizeData);
- }
+ } else {
+ // Once all the segments are valid, then the future segments
to be uploaded by this leader are also valid.
+ remoteLogSizeBytes = fullCopyFinishedSegmentsSizeInBytes;
Review Comment:
Nice improvement! Should we update the metric for
remoteLogSizeCompuatationTime here?
##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java:
##########
@@ -2177,6 +2177,80 @@ public void testRemoteSizeData() {
}
}
+ @Test
+ public void testBuildRetentionSizeData() throws RemoteStorageException {
+ long retentionSize = 1000L;
+ long onlyLocalLogSegmentsSize = 500L;
+ long logEndOffset = 100L;
+ NavigableMap<Integer, Long> epochEntries = new TreeMap<>();
+ epochEntries.put(0, 0L);
+ long fullCopyFinishedSegmentsSizeInBytes = 1600L;
+ RemoteLogManager.RLMExpirationTask expirationTask =
remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
+ assertFalse(expirationTask.isAllSegmentsValid());
+
+ // 1. retentionSize < 0
+ Optional<RemoteLogManager.RetentionSizeData> result = expirationTask
+ .buildRetentionSizeData(-1L, onlyLocalLogSegmentsSize,
logEndOffset, epochEntries, fullCopyFinishedSegmentsSizeInBytes);
+ assertFalse(result.isPresent());
+ assertFalse(expirationTask.isAllSegmentsValid());
+
+ // 2. When (onlyLocalLogSegmentsSize +
fullCopyFinishedSegmentsSizeInBytes) <= configure-retention-size
+ result = expirationTask
+ .buildRetentionSizeData(-1L, onlyLocalLogSegmentsSize,
logEndOffset, epochEntries, 500L);
Review Comment:
The test case is not expecting we provide `retentionSize=-1`, right?
##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java:
##########
@@ -2177,6 +2177,80 @@ public void testRemoteSizeData() {
}
}
+ @Test
+ public void testBuildRetentionSizeData() throws RemoteStorageException {
+ long retentionSize = 1000L;
+ long onlyLocalLogSegmentsSize = 500L;
+ long logEndOffset = 100L;
+ NavigableMap<Integer, Long> epochEntries = new TreeMap<>();
+ epochEntries.put(0, 0L);
+ long fullCopyFinishedSegmentsSizeInBytes = 1600L;
+ RemoteLogManager.RLMExpirationTask expirationTask =
remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
+ assertFalse(expirationTask.isAllSegmentsValid());
+
+ // 1. retentionSize < 0
+ Optional<RemoteLogManager.RetentionSizeData> result = expirationTask
+ .buildRetentionSizeData(-1L, onlyLocalLogSegmentsSize,
logEndOffset, epochEntries, fullCopyFinishedSegmentsSizeInBytes);
+ assertFalse(result.isPresent());
+ assertFalse(expirationTask.isAllSegmentsValid());
+
+ // 2. When (onlyLocalLogSegmentsSize +
fullCopyFinishedSegmentsSizeInBytes) <= configure-retention-size
+ result = expirationTask
+ .buildRetentionSizeData(-1L, onlyLocalLogSegmentsSize,
logEndOffset, epochEntries, 500L);
+ assertFalse(result.isPresent());
+ assertFalse(expirationTask.isAllSegmentsValid());
+
+
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition),
anyInt()))
+ .thenReturn(Collections.emptyIterator());
+
+ // 3. totalSize <= retentionSize
+ // totalSize = 500 (local) + 0 (remote, as listRemoteLogSegments
returns empty) = 500. retentionSize = 1000.
Review Comment:
nit: The comment for the 3rd test case should come before the mock for
`listRemoteLogSegments` at L2203.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]