junrao commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r646791532
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1578,144 +1409,85 @@ class Log(@volatile private var _dir: File, .map(_.messageOffset) .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE) - roll(Some(rollOffset)) + val newSegment = localLog.roll(Some(rollOffset)) + afterRoll(newSegment) + newSegment Review comment: Could we replace the above 3 lines with `roll(Some(rollOffset))`? ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1812,37 +1577,36 @@ class Log(@volatile private var _dir: File, endOffset: Long ): Unit = { logStartOffset = startOffset - nextOffsetMetadata = LogOffsetMetadata(endOffset, activeSegment.baseOffset, activeSegment.size) - recoveryPoint = math.min(recoveryPoint, endOffset) + localLog.updateLogEndOffset(endOffset) Review comment: We need to preserve the LogOffsetMetadata for endOffset and use it to call updateHighWatermark. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1578,144 +1409,85 @@ class Log(@volatile private var _dir: File, .map(_.messageOffset) .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE) - roll(Some(rollOffset)) + val newSegment = localLog.roll(Some(rollOffset)) + afterRoll(newSegment) + newSegment } else { segment } } /** - * Roll the log over to a new active segment starting with the current logEndOffset. + * Roll the local log over to a new active segment starting with the current logEndOffset. Review comment: This comment is not very accurate since we roll to expectedNextOffset or logEndOffset. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1788,14 +1559,8 @@ class Log(@volatile private var _dir: File, maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") { debug(s"Truncate and start at offset $newOffset") lock synchronized { - checkIfMemoryMappedBufferClosed() - removeAndDeleteSegments(logSegments, asyncDelete = true, LogTruncation) - addSegment(LogSegment.open(dir, - baseOffset = newOffset, - config = config, - time = time, - initFileSize = config.initFileSize, - preallocate = config.preallocate)) + val deletedSegments = localLog.truncateFullyAndStartAt(newOffset) + deleteProducerSnapshots(deletedSegments, asyncDelete = true) Review comment: `producerStateManager.truncateFullyAndStartAt()` removes all producer snapshots. So, this is necessary. ########## File path: core/src/main/scala/kafka/log/LocalLog.scala ########## @@ -0,0 +1,1010 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.{File, IOException} +import java.nio.file.Files +import java.text.NumberFormat +import java.util.concurrent.atomic.AtomicLong +import java.util.regex.Pattern +import kafka.metrics.KafkaMetricsGroup +import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata} +import kafka.utils.{Logging, Scheduler} +import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.utils.{Time, Utils} + +import scala.jdk.CollectionConverters._ +import scala.collection.Seq +import scala.collection.mutable.{ArrayBuffer, ListBuffer} + +/** + * Holds the result of splitting a segment into one or more segments, see LocalLog.splitOverflowedSegment(). + * + * @param deletedSegments segments deleted when splitting a segment + * @param newSegments new segments created when splitting a segment + */ +case class SplitSegmentResult(deletedSegments: Iterable[LogSegment], newSegments: Iterable[LogSegment]) + +/** + * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset. + * New log segments are created according to a configurable policy that controls the size in bytes or time interval + * for a given segment. + * + * NOTE: this class is not thread-safe, and it relies on the thread safety provided by the Log class. + * + * @param _dir The directory in which log segments are created. + * @param config The log configuration settings + * @param segments The non-empty log segments recovered from disk + * @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk + * @param nextOffsetMetadata The offset where the next message could be appended + * @param scheduler The thread pool scheduler used for background actions + * @param time The time instance used for checking the clock + * @param topicPartition The topic partition associated with this log + * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure + */ +private[log] class LocalLog(@volatile private var _dir: File, + @volatile var config: LogConfig, + val segments: LogSegments, + @volatile var recoveryPoint: Long, + @volatile private var nextOffsetMetadata: LogOffsetMetadata, + val scheduler: Scheduler, + val time: Time, + val topicPartition: TopicPartition, + val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { + + import kafka.log.LocalLog._ + + this.logIdent = s"[LocalLog partition=$topicPartition, dir=${dir.getParent}] " + + // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() + // After memory mapped buffer is closed, no disk IO operation should be performed for this log. + @volatile private[log] var isMemoryMappedBufferClosed = false + + // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks + @volatile private var _parentDir: String = dir.getParent + + // Last time the log was flushed + private val lastFlushedTime = new AtomicLong(time.milliseconds) + + private[log] def dir: File = _dir + + private[log] def name: String = dir.getName() + + private[log] def parentDir: String = _parentDir + + private[log] def parentDirFile: File = new File(_parentDir) + + private[log] def isFuture: Boolean = dir.getName.endsWith(LocalLog.FutureDirSuffix) + + private def maybeHandleIOException[T](msg: => String)(fun: => T): T = { + LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, msg) { + fun + } + } + + /** + * Rename the directory of the log + * @param name the new dir name + * @throws KafkaStorageException if rename fails + */ + private[log] def renameDir(name: String): Boolean = { + maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") { + val renamedDir = new File(dir.getParent, name) + Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) + if (renamedDir != dir) { + _dir = renamedDir + _parentDir = renamedDir.getParent + segments.updateParentDir(renamedDir) + true + } else { + false + } + } + } + + /** + * Update the existing configuration to the new provided configuration. + * @param newConfig the new configuration to be updated to + */ + private[log] def updateConfig(newConfig: LogConfig): Unit = { + val oldConfig = config + config = newConfig + val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion + val newRecordVersion = newConfig.messageFormatVersion.recordVersion + if (newRecordVersion.precedes(oldRecordVersion)) + warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.") + } + + private[log] def checkIfMemoryMappedBufferClosed(): Unit = { + if (isMemoryMappedBufferClosed) + throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed") + } + + private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = { + recoveryPoint = newRecoveryPoint + } + + /** + * Update recoveryPoint to provided offset and mark the log as flushed, if the offset is greater + * than the existing recoveryPoint. + * + * @param offset the offset to be updated + */ + private[log] def markFlushed(offset: Long): Unit = { + checkIfMemoryMappedBufferClosed() + if (offset > recoveryPoint) { + updateRecoveryPoint(offset) + lastFlushedTime.set(time.milliseconds) + } + } + + /** + * Flush local log segments for all offsets up to offset-1. + * Does not update the recovery point. + * + * @param offset The offset to flush up to (non-inclusive) + */ + private[log] def flush(offset: Long): Unit = { + val segmentsToFlush = segments.values(recoveryPoint, offset) + segmentsToFlush.foreach(_.flush()) + // If there are any new segments, we need to flush the parent directory for crash consistency. + segmentsToFlush.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath)) + } + + /** + * The time this log is last known to have been fully flushed to disk + */ + private[log] def lastFlushTime: Long = lastFlushedTime.get + + /** + * The offset metadata of the next message that will be appended to the log + */ + private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + + /** + * The offset of the next message that will be appended to the log + */ + private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset + + /** + * Update end offset of the log, and update the recoveryPoint. + * + * @param endOffset the new end offset of the log + */ + private[log] def updateLogEndOffset(endOffset: Long): Unit = { + nextOffsetMetadata = LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size) + if (recoveryPoint > endOffset) { + updateRecoveryPoint(endOffset) + } + } + + /** + * Close file handlers used by log but don't write to disk. + * This is called if the log directory is offline. + */ + private[log] def closeHandlers(): Unit = { + segments.closeHandlers() + isMemoryMappedBufferClosed = true + } + + /** + * Closes the segments of the log. + */ + private[log] def close(): Unit = { + maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { + checkIfMemoryMappedBufferClosed() + segments.close() + } + } + + /** + * Completely delete this log directory with no delay. + */ + private[log] def deleteEmptyDir(): Unit = { + maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { + checkIfMemoryMappedBufferClosed() + if (segments.nonEmpty) { + throw new IllegalStateException(s"Can not delete directory when ${segments.numberOfSegments} segments are still present") + } + Utils.delete(dir) + // File handlers will be closed if this log is deleted + isMemoryMappedBufferClosed = true Review comment: It seems that we should set isMemoryMappedBufferClosed in deleteAllSegments()? ########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -246,17 +262,17 @@ object LogLoader extends Logging { return fn } catch { case e: LogSegmentOffsetOverflowException => - info(s"${params.logIdentifier}Caught segment overflow error: ${e.getMessage}. Split segment and retry.") - Log.splitOverflowedSegment( + info(s"${params.logIdentifier} Caught segment overflow error: ${e.getMessage}. Split segment and retry.") + val result = Log.splitOverflowedSegment( e.segment, params.segments, params.dir, params.topicPartition, params.config, params.scheduler, params.logDirFailureChannel, - params.producerStateManager, params.logIdentifier) + deleteProducerSnapshotsAsync(result.deletedSegments, params) Review comment: Yes, that's fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org