ijuma commented on code in PR #18321: URL: https://github.com/apache/kafka/pull/18321#discussion_r1904849231
########## core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala: ########## @@ -134,6 +135,131 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't change") } + @ParameterizedTest + @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd]) + def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): Unit = { + val compression = Compression.of(compressionType).build() + val largeMessageKey = 20 + val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, compression) + val maxMessageSize = compression match { + case Compression.NONE => largeMessageSet.sizeInBytes + case _ => + // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to + // increase because the broker offsets are larger than the ones assigned by the client + // adding `6` to the message set size is good enough for this test: it covers the increased message size while + // still being less than the overhead introduced by the conversion from message format version 0 to 1 + largeMessageSet.sizeInBytes + 6 + } + + cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) + + val log = cleaner.logs.get(topicPartitions(0)) + val props = logConfigProperties(maxMessageSize = maxMessageSize) + props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.name) + val logConfig = new LogConfig(props) + log.updateConfig(logConfig) + + val appends1 = writeDups(numKeys = 100, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + val startSize = log.size + cleaner.startup() + + val firstDirty = log.activeSegment.baseOffset + checkLastCleaned("log", 0, firstDirty) + val compactedSize = log.logSegments.asScala.map(_.size).sum + assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize") + + checkLogAfterAppendingDups(log, startSize, appends1) + + val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + val appendInfo = log.appendAsLeaderWithRecordVersion(largeMessageSet, leaderEpoch = 0, recordVersion = RecordVersion.V0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) + val largeMessageOffset = appendInfo.firstOffset + + // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly + val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V2) + + val v0RecordKeysWithNoV1V2Updates = (appends1.map(_._1).toSet -- dupsV1.map(_._1) -- dupsV2.map(_._1)).map(_.toString) + val appends2: Seq[(Int, String, Long)] = + appends1 ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2 + + // roll the log so that all appended messages can be compacted + log.roll() + val firstDirty2 = log.activeSegment.baseOffset + checkLastCleaned("log", 0, firstDirty2) + + checkLogAfterAppendingDups(log, startSize, appends2) + checkLogAfterConvertingToV2(compressionType, log, logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates) + } + + @ParameterizedTest + @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd]) + def testCleaningNestedMessagesWithV0V1(compressionType: CompressionType): Unit = { + val compression = Compression.of(compressionType).build() + val maxMessageSize = 192 + cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256) + + val log = cleaner.logs.get(topicPartitions(0)) + val logConfig = new LogConfig(logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256)) + log.updateConfig(logConfig) + + // with compression enabled, these messages will be written as a single message containing all the individual messages + var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + + var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + + val appends = appendsV0 ++ appendsV1 + + val v0RecordKeysWithNoV1V2Updates = (appendsV0.map(_._1).toSet -- appendsV1.map(_._1)).map(_.toString) + + // roll the log so that all appended messages can be compacted + log.roll() + val startSize = log.size + cleaner.startup() + + val firstDirty = log.activeSegment.baseOffset + assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1 Review Comment: Makes sense - the original code did not include the `log.roll` and that's why it was written like this. I'll make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org