see-quick commented on code in PR #21657:
URL: https://github.com/apache/kafka/pull/21657#discussion_r2894970898


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java:
##########
@@ -290,6 +299,390 @@ public void testIsThreadFailed() throws Exception {
         assertEquals(cleaner.cleaners().size(), cleaner.deadThreadCount());
     }
 
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testCleanerCompaction(CompressionType compressionType) throws 
Exception {
+        Compression codec = Compression.of(compressionType).build();
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, 
codec);
+        int maxMessageSize = largeMessage.records().sizeInBytes();
+
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        List<KeyValueOffset> appends = writeDups(100, 3, theLog, codec);
+        long startSize = theLog.size();
+        cleaner.startup();
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends);
+
+        LogAppendInfo appendInfo = 
theLog.appendAsLeader(largeMessage.records(), 0);
+        // move LSO forward to increase compaction bound
+        theLog.updateHighWatermark(theLog.logEndOffset());
+        long largeMessageOffset = appendInfo.firstOffset();
+
+        List<KeyValueOffset> dups = writeDups(100, 3, theLog, codec, 
largeMessageKey + 1, RecordBatch.CURRENT_MAGIC_VALUE);
+        List<KeyValueOffset> appends2 = new ArrayList<>(appends);
+        appends2.add(new KeyValueOffset(largeMessageKey, largeMessage.value(), 
largeMessageOffset));
+        appends2.addAll(dups);
+        long firstDirty2 = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty2);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends2);
+
+        // simulate deleting a partition, by removing it from logs
+        // force a checkpoint
+        // and make sure its gone from checkpoint file
+        cleaner.logs().remove(TOPIC_PARTITIONS.get(0));
+        cleaner.updateCheckpoints(logDir, 
Optional.of(TOPIC_PARTITIONS.get(0)));
+        Map<TopicPartition, Long> checkpoints = new OffsetCheckpointFile(
+            new File(logDir, LogCleanerManager.OFFSET_CHECKPOINT_FILE), 
null).read();
+        // we expect partition 0 to be gone
+        assertFalse(checkpoints.containsKey(TOPIC_PARTITIONS.get(0)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testCleansCombinedCompactAndDeleteTopic(CompressionType 
compressionType) throws Exception {
+        Properties logProps = new Properties();
+        int retentionMs = 100000;
+        logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs);
+        logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
+
+        LogAndMessages result1 = runCleanerAndCheckCompacted(100, 
compressionType, logProps);
+        UnifiedLog theLog = result1.log();
+
+        // Set the last modified time to an old value to force deletion of old 
segments
+        long endOffset = theLog.logEndOffset();
+        for (LogSegment segment : theLog.logSegments()) {
+            segment.setLastModified(time.milliseconds() - (2L * retentionMs));
+        }
+        TestUtils.waitForCondition(
+            () -> theLog.logStartOffset() == endOffset && 
theLog.numberOfSegments() == 1,
+            "Timed out waiting for deletion of old segments");
+
+        cleaner.shutdown();
+        closeLog(theLog);
+
+        // run the cleaner again to make sure if there are no issues post 
deletion
+        LogAndMessages result2 = runCleanerAndCheckCompacted(20, 
compressionType, logProps);
+
+        List<KeyValueOffset> read = readFromLogFull(result2.log());
+        assertEquals(toMap(result2.messages()), toMap(read), "Contents of the 
map shouldn't change");
+    }
+
+    private LogAndMessages runCleanerAndCheckCompacted(int numKeys, 
CompressionType compressionType,
+                                                       Properties logProps) 
throws Exception {
+        cleaner = makeCleaner(TOPIC_PARTITIONS.subList(0, 1), logProps, 100L);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        List<KeyValueOffset> messages = writeDups(numKeys, 3, theLog, 
Compression.of(compressionType).build());
+        long startSize = theLog.size();
+
+        theLog.updateHighWatermark(theLog.logEndOffset());
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        cleaner.startup();
+
+        // should compact the log
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+        return new LogAndMessages(theLog, messages);
+    }
+
+    @ParameterizedTest
+    @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = "ZSTD")
+    public void testCleanerWithMessageFormatV0V1V2(CompressionType 
compressionType) throws Exception {
+        Compression compression = Compression.of(compressionType).build();
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, 
compression);
+        int maxMessageSize;
+        if (compressionType == CompressionType.NONE) {
+            maxMessageSize = largeMessage.records().sizeInBytes();
+        } else {
+            // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
+            // increase because the broker offsets are larger than the ones 
assigned by the client
+            // adding `6` to the message set size is good enough for this 
test: it covers the increased message size while
+            // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
+            maxMessageSize = largeMessage.records().sizeInBytes() + 6;
+        }
+
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+        Properties props = logConfigProperties(maxMessageSize);
+        props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.name);
+        LogConfig logConfig = new LogConfig(props);
+        theLog.updateConfig(logConfig);
+
+        List<KeyValueOffset> appends1 = writeDups(100, 3, theLog, compression, 
0, RecordBatch.MAGIC_VALUE_V0);
+        long startSize = theLog.size();
+        cleaner.startup();
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends1);
+
+        List<KeyValueOffset> dupsV0 = writeDups(40, 3, theLog, compression, 0, 
RecordBatch.MAGIC_VALUE_V0);
+        LogAppendInfo appendInfo = 
theLog.appendAsLeaderWithRecordVersion(largeMessage.records(), 0, 
RecordVersion.V0);
+        // move LSO forward to increase compaction bound
+        theLog.updateHighWatermark(theLog.logEndOffset());
+        long largeMessageOffset = appendInfo.firstOffset();
+
+        // also add some messages with version 1 and version 2 to check that 
we handle mixed format versions correctly
+        List<KeyValueOffset> dupsV1 = writeDups(40, 3, theLog, compression, 
30, RecordBatch.MAGIC_VALUE_V1);
+        List<KeyValueOffset> dupsV2 = writeDups(5, 3, theLog, compression, 15, 
RecordBatch.MAGIC_VALUE_V2);
+
+        Set<String> v0RecordKeysWithNoV1V2Updates = new HashSet<>();
+        Set<Integer> dupsV1Keys = new HashSet<>();
+        for (KeyValueOffset kvo : dupsV1) dupsV1Keys.add(kvo.key());
+        Set<Integer> dupsV2Keys = new HashSet<>();
+        for (KeyValueOffset kvo : dupsV2) dupsV2Keys.add(kvo.key());
+        for (KeyValueOffset kvo : appends1) {
+            if (!dupsV1Keys.contains(kvo.key()) && 
!dupsV2Keys.contains(kvo.key())) {
+                v0RecordKeysWithNoV1V2Updates.add(String.valueOf(kvo.key()));
+            }
+        }

Review Comment:
   I wonder if I can improve this a bit ... maybe reason why I like functional 
languages ... :D Maybe using streams would be better?



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java:
##########
@@ -290,6 +299,390 @@ public void testIsThreadFailed() throws Exception {
         assertEquals(cleaner.cleaners().size(), cleaner.deadThreadCount());
     }
 
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testCleanerCompaction(CompressionType compressionType) throws 
Exception {
+        Compression codec = Compression.of(compressionType).build();
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, 
codec);
+        int maxMessageSize = largeMessage.records().sizeInBytes();
+
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        List<KeyValueOffset> appends = writeDups(100, 3, theLog, codec);
+        long startSize = theLog.size();
+        cleaner.startup();
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends);
+
+        LogAppendInfo appendInfo = 
theLog.appendAsLeader(largeMessage.records(), 0);
+        // move LSO forward to increase compaction bound
+        theLog.updateHighWatermark(theLog.logEndOffset());
+        long largeMessageOffset = appendInfo.firstOffset();
+
+        List<KeyValueOffset> dups = writeDups(100, 3, theLog, codec, 
largeMessageKey + 1, RecordBatch.CURRENT_MAGIC_VALUE);
+        List<KeyValueOffset> appends2 = new ArrayList<>(appends);
+        appends2.add(new KeyValueOffset(largeMessageKey, largeMessage.value(), 
largeMessageOffset));
+        appends2.addAll(dups);
+        long firstDirty2 = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty2);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends2);
+
+        // simulate deleting a partition, by removing it from logs
+        // force a checkpoint
+        // and make sure its gone from checkpoint file
+        cleaner.logs().remove(TOPIC_PARTITIONS.get(0));
+        cleaner.updateCheckpoints(logDir, 
Optional.of(TOPIC_PARTITIONS.get(0)));
+        Map<TopicPartition, Long> checkpoints = new OffsetCheckpointFile(
+            new File(logDir, LogCleanerManager.OFFSET_CHECKPOINT_FILE), 
null).read();
+        // we expect partition 0 to be gone
+        assertFalse(checkpoints.containsKey(TOPIC_PARTITIONS.get(0)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testCleansCombinedCompactAndDeleteTopic(CompressionType 
compressionType) throws Exception {
+        Properties logProps = new Properties();
+        int retentionMs = 100000;
+        logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs);
+        logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
+
+        LogAndMessages result1 = runCleanerAndCheckCompacted(100, 
compressionType, logProps);
+        UnifiedLog theLog = result1.log();
+
+        // Set the last modified time to an old value to force deletion of old 
segments
+        long endOffset = theLog.logEndOffset();
+        for (LogSegment segment : theLog.logSegments()) {
+            segment.setLastModified(time.milliseconds() - (2L * retentionMs));
+        }
+        TestUtils.waitForCondition(
+            () -> theLog.logStartOffset() == endOffset && 
theLog.numberOfSegments() == 1,
+            "Timed out waiting for deletion of old segments");
+
+        cleaner.shutdown();
+        closeLog(theLog);
+
+        // run the cleaner again to make sure if there are no issues post 
deletion
+        LogAndMessages result2 = runCleanerAndCheckCompacted(20, 
compressionType, logProps);
+
+        List<KeyValueOffset> read = readFromLogFull(result2.log());
+        assertEquals(toMap(result2.messages()), toMap(read), "Contents of the 
map shouldn't change");
+    }
+
+    private LogAndMessages runCleanerAndCheckCompacted(int numKeys, 
CompressionType compressionType,
+                                                       Properties logProps) 
throws Exception {
+        cleaner = makeCleaner(TOPIC_PARTITIONS.subList(0, 1), logProps, 100L);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        List<KeyValueOffset> messages = writeDups(numKeys, 3, theLog, 
Compression.of(compressionType).build());
+        long startSize = theLog.size();
+
+        theLog.updateHighWatermark(theLog.logEndOffset());
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        cleaner.startup();
+
+        // should compact the log
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+        return new LogAndMessages(theLog, messages);
+    }
+
+    @ParameterizedTest
+    @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = "ZSTD")
+    public void testCleanerWithMessageFormatV0V1V2(CompressionType 
compressionType) throws Exception {
+        Compression compression = Compression.of(compressionType).build();
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, 
compression);
+        int maxMessageSize;
+        if (compressionType == CompressionType.NONE) {
+            maxMessageSize = largeMessage.records().sizeInBytes();
+        } else {
+            // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
+            // increase because the broker offsets are larger than the ones 
assigned by the client
+            // adding `6` to the message set size is good enough for this 
test: it covers the increased message size while
+            // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
+            maxMessageSize = largeMessage.records().sizeInBytes() + 6;
+        }
+
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+        Properties props = logConfigProperties(maxMessageSize);
+        props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.name);
+        LogConfig logConfig = new LogConfig(props);
+        theLog.updateConfig(logConfig);
+
+        List<KeyValueOffset> appends1 = writeDups(100, 3, theLog, compression, 
0, RecordBatch.MAGIC_VALUE_V0);
+        long startSize = theLog.size();
+        cleaner.startup();
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends1);
+
+        List<KeyValueOffset> dupsV0 = writeDups(40, 3, theLog, compression, 0, 
RecordBatch.MAGIC_VALUE_V0);
+        LogAppendInfo appendInfo = 
theLog.appendAsLeaderWithRecordVersion(largeMessage.records(), 0, 
RecordVersion.V0);
+        // move LSO forward to increase compaction bound
+        theLog.updateHighWatermark(theLog.logEndOffset());
+        long largeMessageOffset = appendInfo.firstOffset();
+
+        // also add some messages with version 1 and version 2 to check that 
we handle mixed format versions correctly
+        List<KeyValueOffset> dupsV1 = writeDups(40, 3, theLog, compression, 
30, RecordBatch.MAGIC_VALUE_V1);
+        List<KeyValueOffset> dupsV2 = writeDups(5, 3, theLog, compression, 15, 
RecordBatch.MAGIC_VALUE_V2);
+
+        Set<String> v0RecordKeysWithNoV1V2Updates = new HashSet<>();
+        Set<Integer> dupsV1Keys = new HashSet<>();
+        for (KeyValueOffset kvo : dupsV1) dupsV1Keys.add(kvo.key());
+        Set<Integer> dupsV2Keys = new HashSet<>();
+        for (KeyValueOffset kvo : dupsV2) dupsV2Keys.add(kvo.key());
+        for (KeyValueOffset kvo : appends1) {
+            if (!dupsV1Keys.contains(kvo.key()) && 
!dupsV2Keys.contains(kvo.key())) {
+                v0RecordKeysWithNoV1V2Updates.add(String.valueOf(kvo.key()));
+            }
+        }

Review Comment:
   I wonder if I can improve this a bit ... maybe this is a reason why I like 
functional languages ... :D Maybe using streams would be better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to