[ https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao resolved KAFKA-13194. ----------------------------- Fix Version/s: 3.1.0 Assignee: Lucas Bradstreet Resolution: Fixed Merged the PR to trunk. > LogCleaner may clean past highwatermark > --------------------------------------- > > Key: KAFKA-13194 > URL: https://issues.apache.org/jira/browse/KAFKA-13194 > Project: Kafka > Issue Type: Bug > Reporter: Lucas Bradstreet > Assignee: Lucas Bradstreet > Priority: Minor > Fix For: 3.1.0 > > > Here we have the cleaning point being bounded to the active segment base > offset and the first unstable offset. Which makes sense: > > {code:java} > // find first segment that cannot be cleaned > // neither the active segment, nor segments with any messages closer to > the head of the log than the minimum compaction lag time > // may be cleaned > val firstUncleanableDirtyOffset: Long = Seq( // we do not clean > beyond the first unstable offset > log.firstUnstableOffset, // the active segment is always > uncleanable > Option(log.activeSegment.baseOffset), // the first segment whose > largest message timestamp is within a minimum time lag from now > if (minCompactionLagMs > 0) { > // dirty log segments > val dirtyNonActiveSegments = > log.localNonActiveLogSegmentsFrom(firstDirtyOffset) > dirtyNonActiveSegments.find { s => > val isUncleanable = s.largestTimestamp > now - minCompactionLagMs > debug(s"Checking if log segment may be cleaned: log='${log.name}' > segment.baseOffset=${s.baseOffset} " + > s"segment.largestTimestamp=${s.largestTimestamp}; now - > compactionLag=${now - minCompactionLagMs}; " + > s"is uncleanable=$isUncleanable") > isUncleanable > }.map(_.baseOffset) > } else None > ).flatten.min > {code} > > But LSO starts out as None. > {code:java} > @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] > = None > private[log] def firstUnstableOffset: Option[Long] = > firstUnstableOffsetMetadata.map(_.messageOffset){code} > For most code depending on the LSO, fetchLastStableOffsetMetadata is used to > default it to the hwm if it's not set. > > {code:java} > private def fetchLastStableOffsetMetadata: LogOffsetMetadata = { > checkIfMemoryMappedBufferClosed() // cache the current high watermark > to avoid a concurrent update invalidating the range check > val highWatermarkMetadata = fetchHighWatermarkMetadata > firstUnstableOffsetMetadata match { > case Some(offsetMetadata) if offsetMetadata.messageOffset < > highWatermarkMetadata.messageOffset => > if (offsetMetadata.messageOffsetOnly) { > lock synchronized { > val fullOffset = > convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset) > if (firstUnstableOffsetMetadata.contains(offsetMetadata)) > firstUnstableOffsetMetadata = Some(fullOffset) > fullOffset > } > } else { > offsetMetadata > } > case _ => highWatermarkMetadata > } > } > {code} > > > This means that in the case where the hwm is prior to the active segment > base, the log cleaner may clean past the hwm. This is most likely to occur > after a broker restart when the log cleaner may start cleaning prior to > replication becoming active. -- This message was sent by Atlassian Jira (v8.3.4#803005)