hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r699714060



##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the 
create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the 
batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the 
timestamp deltas.

Review comment:
       nit: the base timestamp is used to calculate the record timestamps from 
the deltas

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -500,14 +500,15 @@ private[log] class Cleaner(val id: Int,
         case None => 0L
         case Some(seg) => seg.lastModified - 
cleanable.log.config.deleteRetentionMs
     }
-
-    doClean(cleanable, deleteHorizonMs)
+    doClean(cleanable, time.milliseconds(), legacyDeleteHorizonMs = 
deleteHorizonMs)
   }
 
-  private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): 
(Long, CleanerStats) = {
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long, 
legacyDeleteHorizonMs: Long = -1L): (Long, CleanerStats) = {

Review comment:
       Do we need `legacyDeleteHorizonMs` as a parameter? As far as I can tell, 
there are no cases in the tests which override it. Maybe we could just compute 
it here instead of in `clean`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -622,26 +628,38 @@ private[log] class Cleaner(val id: Int,
    * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
    * @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
+   *
+   * @return the latestDeleteHorizon that is found from the FilterResult of 
the cleaning
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
                              sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
-                             retainDeletesAndTxnMarkers: Boolean,
+                             retainLegacyDeletesAndTxnMarkers: Boolean,
+                             deleteRetentionMs: Long,
                              maxLogMessageSize: Int,
                              transactionMetadata: CleanedTransactionMetadata,
                              lastRecordsOfActiveProducers: Map[Long, 
LastRecord],
-                             stats: CleanerStats): Unit = {
-    val logCleanerFilter: RecordFilter = new RecordFilter {
+                             stats: CleanerStats,
+                             currentTime: Long): Long = {
+    val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
       var discardBatchRecords: Boolean = _
 
-      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+      override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult = {
         // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
         // note that we will never delete a marker until all the records from 
that transaction are removed.
-        discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletesAndTxnMarkers)
+        val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+        if (batch.isControlBatch) {
+            discardBatchRecords = canDiscardBatch && 
batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= 
currentTime

Review comment:
       nit: misaligned

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -658,23 +676,28 @@ private[log] class Cleaner(val id: Int,
           }
         }
 
-        if (batch.hasProducerId && isBatchLastRecordOfProducer)
-          BatchRetention.RETAIN_EMPTY
-        else if (discardBatchRecords)
-          BatchRetention.DELETE
-        else
-          BatchRetention.DELETE_EMPTY
+        val batchRetention: BatchRetention =
+          if (batch.hasProducerId && isBatchLastRecordOfProducer)
+            BatchRetention.RETAIN_EMPTY
+          else if (discardBatchRecords)
+            BatchRetention.DELETE
+          else
+            BatchRetention.DELETE_EMPTY
+        new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)
       }
 
       override def shouldRetainRecord(batch: RecordBatch, record: Record): 
Boolean = {
+        var isRecordRetained: Boolean = true

Review comment:
       Why do we need this `var`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -776,7 +801,13 @@ private[log] class Cleaner(val id: Int,
        *   2) The message doesn't has value but it can't be deleted now.
        */
       val latestOffsetForKey = record.offset() >= foundOffset
-      val isRetainedValue = record.hasValue || retainDeletes
+      val supportDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2
+      val shouldRetainDeletes =

Review comment:
       nit: maybe turn this into a `def` since we don't even use the computed 
value unless `record.hasValue` is false.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove 
tombstones if we can
+            // under the condition that the log's latest delete horizon is 
less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && 
ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
       When the broker is initialized, `log.latestDeleteHorizon` will be 
`NO_TIMESTAMP`. We need at least one run to trigger before we can initialize 
the value. Is there another condition we can rely on in order to ensure that 
the cleaning still occurs?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 

Review comment:
       nit: no need for `case`. Usually we write this as
   ```scala
   dirtyLogs.filter { ltc =>
   ...
   }
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -522,13 +523,13 @@ private[log] class Cleaner(val id: Int,
     val cleanableHorizonMs = log.logSegments(0, 
cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
     // group the segments and clean the groups
-    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior 
to %s)...".format(log.name, new Date(cleanableHorizonMs), new 
Date(deleteHorizonMs)))
+    info("Cleaning log %s (cleaning prior to %s, discarding legacy tombstones 
prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new 
Date(legacyDeleteHorizonMs)))

Review comment:
       Might not be very clear what a "legacy tombstone" means. Would it be 
fair to call this an upper bound on the deletion horizon? 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +162,13 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the 
create time of the record even if the
-     * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the 
batch is empty
+     * Gets the base timestamp of the batch which is used to calculate the 
timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty

Review comment:
       I think we can leave off the comment about the batch being empty since 
we're not using this for the first timestamp anymore.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -544,17 +545,19 @@ private[log] class Cleaner(val id: Int,
    * @param log The log being cleaned
    * @param segments The group of segments being cleaned
    * @param map The offset map to use for cleaning segments
-   * @param deleteHorizonMs The time to retain delete tombstones
+   * @param currentTime The current time in milliseconds
    * @param stats Collector for cleaning statistics
    * @param transactionMetadata State of ongoing transactions which is carried 
between the cleaning
    *                            of the grouped segments
+   * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose 
version is less than 2
    */
   private[log] def cleanSegments(log: UnifiedLog,
                                  segments: Seq[LogSegment],
                                  map: OffsetMap,
-                                 deleteHorizonMs: Long,
+                                 currentTime: Long,
                                  stats: CleanerStats,
-                                 transactionMetadata: 
CleanedTransactionMetadata): Unit = {
+                                 transactionMetadata: 
CleanedTransactionMetadata,
+                                 legacyDeleteHorizonMs: Long = -1L): Unit = {

Review comment:
       Can we make this a required parameter? We try to avoid optional 
parameters because it is easy to miss them.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -574,14 +577,17 @@ private[log] class Cleaner(val id: Int,
         val abortedTransactions = log.collectAbortedTransactions(startOffset, 
upperBoundOffset)
         transactionMetadata.addAbortedTransactions(abortedTransactions)
 
-        val retainDeletesAndTxnMarkers = currentSegment.lastModified > 
deleteHorizonMs
+        val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > 
legacyDeleteHorizonMs
         info(s"Cleaning $currentSegment in log ${log.name} into 
${cleaned.baseOffset} " +
-          s"with deletion horizon $deleteHorizonMs, " +
-          s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} 
deletes.")
+          s"with legacy deletion horizon $legacyDeleteHorizonMs, " +
+          s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else 
"discarding"} deletes.")

Review comment:
       This log message becomes confusing after this change. How about 
something like this?
   ```
   s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
       s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed 
from " +
       s"the segment last modified time of ${currentSegment.lastModified}"
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
       val cleanableLogs = dirtyLogs.filter { ltc =>
         (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
       }
+
       if(cleanableLogs.isEmpty) {
-        None
+        val logsWithTombstonesExpired = dirtyLogs.filter {
+          case ltc => 
+            // in this case, we are probably in a low throughput situation
+            // therefore, we should take advantage of this fact and remove 
tombstones if we can
+            // under the condition that the log's latest delete horizon is 
less than the current time
+            // tracked
+            ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && 
ltc.log.latestDeleteHorizon <= time.milliseconds()
+        }
+        if (!logsWithTombstonesExpired.isEmpty) {

Review comment:
       nit: use `nonEmpty`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to