ijuma commented on code in PR #18321:
URL: https://github.com/apache/kafka/pull/18321#discussion_r1904849231


##########
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##########
@@ -134,6 +135,131 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't 
change")
   }
 
+  @ParameterizedTest
+  @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
+  def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): 
Unit = {
+    val compression = Compression.of(compressionType).build()
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, 
compression)
+    val maxMessageSize = compression match {
+      case Compression.NONE => largeMessageSet.sizeInBytes
+      case _ =>
+        // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
+        // increase because the broker offsets are larger than the ones 
assigned by the client
+        // adding `6` to the message set size is good enough for this test: it 
covers the increased message size while
+        // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
+        largeMessageSet.sizeInBytes + 6
+    }
+
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
maxMessageSize)
+
+    val log = cleaner.logs.get(topicPartitions(0))
+    val props = logConfigProperties(maxMessageSize = maxMessageSize)
+    props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.name)
+    val logConfig = new LogConfig(props)
+    log.updateConfig(logConfig)
+
+    val appends1 = writeDups(numKeys = 100, numDups = 3, log = log, codec = 
compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
+    val startSize = log.size
+    cleaner.startup()
+
+    val firstDirty = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.asScala.map(_.size).sum
+    assertTrue(startSize > compactedSize, s"log should have been compacted: 
startSize=$startSize compactedSize=$compactedSize")
+
+    checkLogAfterAppendingDups(log, startSize, appends1)
+
+    val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = 
compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
+    val appendInfo = log.appendAsLeaderWithRecordVersion(largeMessageSet, 
leaderEpoch = 0, recordVersion = RecordVersion.V0)
+    // move LSO forward to increase compaction bound
+    log.updateHighWatermark(log.logEndOffset)
+    val largeMessageOffset = appendInfo.firstOffset
+
+    // also add some messages with version 1 and version 2 to check that we 
handle mixed format versions correctly
+    val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = 
log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
+    val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, 
codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V2)
+
+    val v0RecordKeysWithNoV1V2Updates = (appends1.map(_._1).toSet -- 
dupsV1.map(_._1) -- dupsV2.map(_._1)).map(_.toString)
+    val appends2: Seq[(Int, String, Long)] =
+      appends1 ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, 
largeMessageOffset)) ++ dupsV1 ++ dupsV2
+
+    // roll the log so that all appended messages can be compacted
+    log.roll()
+    val firstDirty2 = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty2)
+
+    checkLogAfterAppendingDups(log, startSize, appends2)
+    checkLogAfterConvertingToV2(compressionType, log, 
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates)
+  }
+
+  @ParameterizedTest
+  @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
+  def testCleaningNestedMessagesWithV0V1(compressionType: CompressionType): 
Unit = {
+    val compression = Compression.of(compressionType).build()
+    val maxMessageSize = 192
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
maxMessageSize, segmentSize = 256)
+
+    val log = cleaner.logs.get(topicPartitions(0))
+    val logConfig = new LogConfig(logConfigProperties(maxMessageSize = 
maxMessageSize, segmentSize = 256))
+    log.updateConfig(logConfig)
+
+    // with compression enabled, these messages will be written as a single 
message containing all the individual messages
+    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = 
log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
+    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups 
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
+
+    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, 
numDups = 2, log = log, codec = compression, magicValue = 
RecordBatch.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups 
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups 
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
+
+    val appends = appendsV0 ++ appendsV1
+
+    val v0RecordKeysWithNoV1V2Updates = (appendsV0.map(_._1).toSet -- 
appendsV1.map(_._1)).map(_.toString)
+
+    // roll the log so that all appended messages can be compacted
+    log.roll()
+    val startSize = log.size
+    cleaner.startup()
+
+    val firstDirty = log.activeSegment.baseOffset
+    assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 
and V1

Review Comment:
   Makes sense - the original code did not include the `log.roll` and that's 
why it was written like this. I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to