junrao commented on code in PR #18321: URL: https://github.com/apache/kafka/pull/18321#discussion_r1904791916
########## core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala: ########## @@ -134,6 +135,131 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't change") } + @ParameterizedTest + @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd]) + def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): Unit = { + val compression = Compression.of(compressionType).build() + val largeMessageKey = 20 + val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, compression) + val maxMessageSize = compression match { + case Compression.NONE => largeMessageSet.sizeInBytes + case _ => + // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to + // increase because the broker offsets are larger than the ones assigned by the client + // adding `6` to the message set size is good enough for this test: it covers the increased message size while + // still being less than the overhead introduced by the conversion from message format version 0 to 1 + largeMessageSet.sizeInBytes + 6 Review Comment: This is probably unnecessary. LogCleaner can grow the readBuffer to larger than maxMessageSize, based on the batch size in the header. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -509,8 +509,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private def initializeLeaderEpochCache(): Unit = lock synchronized { - leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - dir, topicPartition, logDirFailureChannel, logIdent, leaderEpochCache, scheduler) + leaderEpochCache = Some(UnifiedLog.createLeaderEpochCache( Review Comment: Ideally, we want to change `leaderEpochCache` to not an `Option`. If it's too much in this PR, could we file a followup jira? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -757,7 +769,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, leaderEpoch: Int, requestLocal: Option[RequestLocal], verificationGuard: VerificationGuard, - ignoreRecordSize: Boolean): LogAppendInfo = { + ignoreRecordSize: Boolean, + toMagic: Byte = RecordBatch.CURRENT_MAGIC_VALUE): LogAppendInfo = { Review Comment: For testing, we could append records with `validateAndAssignOffsets=false` and `origin=AppendOrigin.REPLICATION`. This way, we can get rid of `toMagic` here and the `toMagic` in `LogValidator` since it's always 2. This also probably allows us to remove some code in `LogValidator` like `convertAndAssignOffsetsNonCompressed()`. The slight inconvenience is that the caller needs to assign the right offset in records. ########## core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala: ########## @@ -134,6 +135,131 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't change") } + @ParameterizedTest + @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd]) + def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): Unit = { + val compression = Compression.of(compressionType).build() + val largeMessageKey = 20 + val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, compression) + val maxMessageSize = compression match { + case Compression.NONE => largeMessageSet.sizeInBytes + case _ => + // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to + // increase because the broker offsets are larger than the ones assigned by the client + // adding `6` to the message set size is good enough for this test: it covers the increased message size while + // still being less than the overhead introduced by the conversion from message format version 0 to 1 + largeMessageSet.sizeInBytes + 6 + } + + cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) + + val log = cleaner.logs.get(topicPartitions(0)) + val props = logConfigProperties(maxMessageSize = maxMessageSize) + props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.name) + val logConfig = new LogConfig(props) + log.updateConfig(logConfig) + + val appends1 = writeDups(numKeys = 100, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + val startSize = log.size + cleaner.startup() + + val firstDirty = log.activeSegment.baseOffset + checkLastCleaned("log", 0, firstDirty) + val compactedSize = log.logSegments.asScala.map(_.size).sum + assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize") + + checkLogAfterAppendingDups(log, startSize, appends1) + + val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + val appendInfo = log.appendAsLeaderWithRecordVersion(largeMessageSet, leaderEpoch = 0, recordVersion = RecordVersion.V0) + // move LSO forward to increase compaction bound + log.updateHighWatermark(log.logEndOffset) + val largeMessageOffset = appendInfo.firstOffset + + // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly + val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V2) + + val v0RecordKeysWithNoV1V2Updates = (appends1.map(_._1).toSet -- dupsV1.map(_._1) -- dupsV2.map(_._1)).map(_.toString) + val appends2: Seq[(Int, String, Long)] = + appends1 ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2 + + // roll the log so that all appended messages can be compacted + log.roll() + val firstDirty2 = log.activeSegment.baseOffset + checkLastCleaned("log", 0, firstDirty2) + + checkLogAfterAppendingDups(log, startSize, appends2) + checkLogAfterConvertingToV2(compressionType, log, logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates) + } + + @ParameterizedTest + @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd]) + def testCleaningNestedMessagesWithV0V1(compressionType: CompressionType): Unit = { + val compression = Compression.of(compressionType).build() + val maxMessageSize = 192 + cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256) + + val log = cleaner.logs.get(topicPartitions(0)) + val logConfig = new LogConfig(logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256)) + log.updateConfig(logConfig) + + // with compression enabled, these messages will be written as a single message containing all the individual messages + var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) + + var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) + + val appends = appendsV0 ++ appendsV1 + + val v0RecordKeysWithNoV1V2Updates = (appendsV0.map(_._1).toSet -- appendsV1.map(_._1)).map(_.toString) + + // roll the log so that all appended messages can be compacted + log.roll() + val startSize = log.size + cleaner.startup() + + val firstDirty = log.activeSegment.baseOffset + assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1 Review Comment: To ensure we clean v1 data, we need to include appendsV1.size too, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org