kamalcph commented on code in PR #21612:
URL: https://github.com/apache/kafka/pull/21612#discussion_r2870844762
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1944,25 +1965,29 @@ private int deleteSegments(List<LogSegment> deletable,
SegmentDeletionReason rea
* not deletion is enabled, delete any local log segments that are before
the log start offset
*/
public int deleteOldSegments() throws IOException {
+ int deletedSegments;
if (config().delete) {
- return deleteLogStartOffsetBreachedSegments() +
+ deletedSegments = deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
} else if (config().compact) {
- return deleteLogStartOffsetBreachedSegments();
+ deletedSegments = deleteLogStartOffsetBreachedSegments();
} else {
// If cleanup.policy is empty and remote storage is enabled, the
local log segments will
// be cleaned based on the values of log.local.retention.bytes and
log.local.retention.ms
if (remoteLogEnabledAndRemoteCopyEnabled()) {
- return deleteLogStartOffsetBreachedSegments() +
+ deletedSegments = deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
} else {
// If cleanup.policy is empty and remote storage is disabled,
we should not delete any local log segments
// unless the log start offset advances through deleteRecords
- return deleteLogStartOffsetBreachedSegments();
+ deletedSegments = deleteLogStartOffsetBreachedSegments();
}
}
+ // To save CPU cycles, calculate retentionSizeInPercent only when the
log-cleaner thread runs
+ retentionSizeInPercentValue.set(calculateRetentionSizeInPercent());
Review Comment:
can we run this in `finally` block to ensure that the size is calculated
during log deletion errors too? Also, cover this scenario with a test.
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -747,4 +753,101 @@ public static MemoryRecords records(List<SimpleRecord>
records,
}
return builder.build();
}
+
+ /**
+ * Test RetentionSizeInPercent metric for regular (non-tiered) topics.
+ * The metric should only be reported for non-tiered topics with
size-based retention configured.
+ *
+ * @param remoteLogStorageEnable whether remote log storage is enabled
+ * @param remoteLogCopyDisable whether remote log copy is disabled (only
relevant when remote storage is enabled)
+ * @param expectedSizeInPercent expected percentage value after retention
cleanup
+ */
+ @ParameterizedTest
+ @CsvSource({
+ // Remote storage enabled with copy enabled: metric handled by
RemoteLogManager, returns 0 here
+ "true, false, 0",
+ // Remote storage enabled but copy disabled: metric should be
calculated (100%)
+ "true, true, 100",
+ // Remote storage disabled: metric should be calculated (100%)
+ "false, false, 100",
+ // Remote storage disabled (remoteLogCopyDisable is ignored): metric
should be calculated (100%)
+ "false, true, 100"
+ })
+ public void testRetentionSizeInPercentMetric(boolean
remoteLogStorageEnable, boolean remoteLogCopyDisable, int
expectedSizeInPercent) throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ int recordSize = records.get().sizeInBytes();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(recordSize * 5)
+ .retentionBytes(recordSize * 10L)
+ .remoteLogStorageEnable(remoteLogStorageEnable)
+ .remoteLogCopyDisable(remoteLogCopyDisable)
+ .build();
+ log = createLog(logDir, logConfig, true);
+
+ String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
+ ",partition=" + log.topicPartition().partition();
+
+ // Append some messages to create 3 segments (15 records / 5 records
per segment = 3 segments)
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // Before deletion, calculate what the percentage should be
+ // Total size = 15 * recordSize, retention = 10 * recordSize
+ // Percentage = (15 * 100) / 10 = 150% (for non-tiered topics)
+ if (!remoteLogStorageEnable || remoteLogCopyDisable) {
+ assertEquals(150, log.calculateRetentionSizeInPercent());
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ // For tiered storage tests, simulate remote storage having the data
+ if (remoteLogStorageEnable) {
+ log.updateHighestOffsetInRemoteStorage(9);
+ }
+ log.deleteOldSegments();
+
+ // After deletion: log size should be ~10 * recordSize (2 segments),
retention = 10 * recordSize
+ // Percentage = (10 * 100) / 10 = 100% (for non-tiered topics)
+ // Verify via Yammer metric (JMX)
+ assertEquals(expectedSizeInPercent, yammerMetricValue(metricName));
+ assertEquals(2, log.numberOfSegments(), "should have 2 segments after
deletion");
+ }
+
+ @Test
+ public void testRetentionSizeInPercentWithZeroRetention() throws
IOException {
Review Comment:
nit: testRetentionSizeInPercentWithZeroRetention ->
testRetentionSizeInPercentWithNegativeRetention /
testRetentionSizeInPercentWithInfiniteRetention
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java:
##########
@@ -24,6 +24,7 @@ public class LogMetricNames {
public static final String LOG_START_OFFSET = "LogStartOffset";
public static final String LOG_END_OFFSET = "LogEndOffset";
public static final String SIZE = "Size";
+ public static final String RETENTION_SIZE_IN_PERCENT =
"RetentionSizeInPercent";
- public static final List<String> ALL_METRIC_NAMES =
List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE);
+ public static final List<String> ALL_METRIC_NAMES =
List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE,
RETENTION_SIZE_IN_PERCENT);
Review Comment:
nit: fold this line to keep the line width to 120 chars.
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -747,4 +753,101 @@ public static MemoryRecords records(List<SimpleRecord>
records,
}
return builder.build();
}
+
+ /**
+ * Test RetentionSizeInPercent metric for regular (non-tiered) topics.
+ * The metric should only be reported for non-tiered topics with
size-based retention configured.
+ *
+ * @param remoteLogStorageEnable whether remote log storage is enabled
+ * @param remoteLogCopyDisable whether remote log copy is disabled (only
relevant when remote storage is enabled)
+ * @param expectedSizeInPercent expected percentage value after retention
cleanup
+ */
+ @ParameterizedTest
+ @CsvSource({
+ // Remote storage enabled with copy enabled: metric handled by
RemoteLogManager, returns 0 here
+ "true, false, 0",
+ // Remote storage enabled but copy disabled: metric should be
calculated (100%)
+ "true, true, 100",
+ // Remote storage disabled: metric should be calculated (100%)
+ "false, false, 100",
+ // Remote storage disabled (remoteLogCopyDisable is ignored): metric
should be calculated (100%)
+ "false, true, 100"
+ })
+ public void testRetentionSizeInPercentMetric(boolean
remoteLogStorageEnable, boolean remoteLogCopyDisable, int
expectedSizeInPercent) throws IOException {
Review Comment:
nit: fold this line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]