[ 
https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13194.
-----------------------------
    Fix Version/s: 3.1.0
         Assignee: Lucas Bradstreet
       Resolution: Fixed

Merged the PR to trunk.

> LogCleaner may clean past highwatermark
> ---------------------------------------
>
>                 Key: KAFKA-13194
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13194
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Lucas Bradstreet
>            Assignee: Lucas Bradstreet
>            Priority: Minor
>             Fix For: 3.1.0
>
>
> Here we have the cleaning point being bounded to the active segment base 
> offset and the first unstable offset. Which makes sense:
>  
> {code:java}
>    // find first segment that cannot be cleaned
>     // neither the active segment, nor segments with any messages closer to 
> the head of the log than the minimum compaction lag time
>     // may be cleaned
>     val firstUncleanableDirtyOffset: Long = Seq(      // we do not clean 
> beyond the first unstable offset
>       log.firstUnstableOffset,      // the active segment is always 
> uncleanable
>       Option(log.activeSegment.baseOffset),      // the first segment whose 
> largest message timestamp is within a minimum time lag from now
>       if (minCompactionLagMs > 0) {
>         // dirty log segments
>         val dirtyNonActiveSegments = 
> log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
>         dirtyNonActiveSegments.find { s =>
>           val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
>           debug(s"Checking if log segment may be cleaned: log='${log.name}' 
> segment.baseOffset=${s.baseOffset} " +
>             s"segment.largestTimestamp=${s.largestTimestamp}; now - 
> compactionLag=${now - minCompactionLagMs}; " +
>             s"is uncleanable=$isUncleanable")
>           isUncleanable
>         }.map(_.baseOffset)
>       } else None
>     ).flatten.min
> {code}
>  
> But LSO starts out as None.
> {code:java}
> @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
> = None
> private[log] def firstUnstableOffset: Option[Long] = 
> firstUnstableOffsetMetadata.map(_.messageOffset){code}
> For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
> default it to the hwm if it's not set.
>  
> {code:java}
>   private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
>     checkIfMemoryMappedBufferClosed()    // cache the current high watermark 
> to avoid a concurrent update invalidating the range check
>     val highWatermarkMetadata = fetchHighWatermarkMetadata    
> firstUnstableOffsetMetadata match {
>       case Some(offsetMetadata) if offsetMetadata.messageOffset < 
> highWatermarkMetadata.messageOffset =>
>         if (offsetMetadata.messageOffsetOnly) {
>           lock synchronized {
>             val fullOffset = 
> convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
>             if (firstUnstableOffsetMetadata.contains(offsetMetadata))
>               firstUnstableOffsetMetadata = Some(fullOffset)
>             fullOffset
>           }
>         } else {
>           offsetMetadata
>         }
>       case _ => highWatermarkMetadata
>     }
>   }
> {code}
>  
>  
> This means that in the case where the hwm is prior to the active segment 
> base, the log cleaner may clean past the hwm. This is most likely to occur 
> after a broker restart when the log cleaner may start cleaning prior to 
> replication becoming active.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to