kowshik commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r645437282
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File, } } - deleteOldSegments(shouldDelete, RetentionSizeBreach) + deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { nextSegmentOpt.exists(_.baseOffset <= logStartOffset) } - deleteOldSegments(shouldDelete, StartOffsetBreach) + deleteOldSegments(shouldDelete, StartOffsetBreach(this)) } def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) /** * The size of the log in bytes */ - def size: Long = Log.sizeInBytes(logSegments) + def size: Long = localLog.segments.sizeInBytes /** - * The offset metadata of the next message that will be appended to the log + * The offset of the next message that will be appended to the log */ - def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + def logEndOffset: Long = localLog.logEndOffset /** - * The offset of the next message that will be appended to the log + * The offset metadata of the next message that will be appended to the log */ - def logEndOffset: Long = nextOffsetMetadata.messageOffset + def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata + + private val rollAction = RollAction( Review comment: Done. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1572,144 +1414,69 @@ class Log(@volatile private var _dir: File, .map(_.messageOffset) .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE) - roll(Some(rollOffset)) + localLog.roll(Some(rollOffset), Some(rollAction)) } else { segment } } /** - * Roll the log over to a new active segment starting with the current logEndOffset. + * Roll the local log over to a new active segment starting with the current logEndOffset. * This will trim the index to the exact size of the number of entries it currently contains. * * @return The newly rolled segment */ def roll(expectedNextOffset: Option[Long] = None): LogSegment = { - maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { - val start = time.hiResClockMs() - lock synchronized { - checkIfMemoryMappedBufferClosed() - val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) - val logFile = Log.logFile(dir, newOffset) - - if (segments.contains(newOffset)) { - // segment with the same base offset already exists and loaded - if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { - // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an - // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). - warn(s"Trying to roll a new log segment with start offset $newOffset " + - s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + - s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + - s" size of offset index: ${activeSegment.offsetIndex.entries}.") - removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, LogRoll) - } else { - throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + - s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + - s"segment is ${segments.get(newOffset)}.") - } - } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) { - throw new KafkaException( - s"Trying to roll a new log segment for topic partition $topicPartition with " + - s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") - } else { - val offsetIdxFile = offsetIndexFile(dir, newOffset) - val timeIdxFile = timeIndexFile(dir, newOffset) - val txnIdxFile = transactionIndexFile(dir, newOffset) - - for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { - warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") - Files.delete(file.toPath) - } - - segments.lastSegment.foreach(_.onBecomeInactiveSegment()) - } - - // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot - // offset align with the new segment offset since this ensures we can recover the segment by beginning - // with the corresponding snapshot file and scanning the segment data. Because the segment base offset - // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), - // we manually override the state offset here prior to taking the snapshot. - producerStateManager.updateMapEndOffset(newOffset) - producerStateManager.takeSnapshot() - - val segment = LogSegment.open(dir, - baseOffset = newOffset, - config, - time = time, - initFileSize = config.initFileSize, - preallocate = config.preallocate) - addSegment(segment) - - // We need to update the segment base offset and append position data of the metadata when log rolls. - // The next offset should not change. - updateLogEndOffset(nextOffsetMetadata.messageOffset) - - // schedule an asynchronous flush of the old segment - scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) - - info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") - - segment - } + lock synchronized { + localLog.roll(expectedNextOffset, Some(rollAction)) } } /** * The number of messages appended to the log since the last flush */ - private def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint + private def unflushedMessages: Long = logEndOffset - localLog.recoveryPoint /** - * Flush all log segments + * Flush all local log segments */ def flush(): Unit = flush(this.logEndOffset) /** - * Flush log segments for all offsets up to offset-1 + * Flush local log segments for all offsets up to offset-1 * * @param offset The offset to flush up to (non-inclusive); the new recovery point */ def flush(offset: Long): Unit = { maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { - if (offset > this.recoveryPoint) { + if (offset > localLog.recoveryPoint) { debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + s"unflushed: $unflushedMessages") - val segments = logSegments(this.recoveryPoint, offset) - segments.foreach(_.flush()) - // if there are any new segments, we need to flush the parent directory for crash consistency - segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath)) - + localLog.flush(offset) lock synchronized { - checkIfMemoryMappedBufferClosed() - if (offset > this.recoveryPoint) { - this.recoveryPoint = offset - lastFlushedTime.set(time.milliseconds) - } + localLog.markFlushed(offset) } } } } /** - * Completely delete this log directory and all contents from the file system with no delay + * Completely delete the local log directory and all contents from the file system with no delay */ private[log] def delete(): Unit = { maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { lock synchronized { - checkIfMemoryMappedBufferClosed() producerExpireCheck.cancel(true) - removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion) leaderEpochCache.foreach(_.clear()) - Utils.delete(dir) - // File handlers will be closed if this log is deleted - isMemoryMappedBufferClosed = true + val deletedSegments = localLog.delete() + deleteProducerSnapshotAsync(deletedSegments) Review comment: Done. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1806,37 +1566,37 @@ class Log(@volatile private var _dir: File, endOffset: Long ): Unit = { logStartOffset = startOffset - nextOffsetMetadata = LogOffsetMetadata(endOffset, activeSegment.baseOffset, activeSegment.size) - recoveryPoint = math.min(recoveryPoint, endOffset) - rebuildProducerState(endOffset, producerStateManager) + lock synchronized { + rebuildProducerState(endOffset, producerStateManager) + } Review comment: Done. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1852,65 +1612,24 @@ class Log(@volatile private var _dir: File, logString.toString } - /** - * This method deletes the given log segments by doing the following for each of them: - * <ol> - * <li>It removes the segment from the segment map so that it will no longer be used for reads. - * <li>It renames the index and log files by appending .deleted to the respective file name - * <li>It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously - * </ol> - * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of - * physically deleting a file while it is being read. - * - * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded - * or the immediate caller will catch and handle IOException - * - * @param segments The log segments to schedule for deletion - * @param asyncDelete Whether the segment files should be deleted asynchronously - */ - private def removeAndDeleteSegments(segments: Iterable[LogSegment], - asyncDelete: Boolean, - reason: SegmentDeletionReason): Unit = { - if (segments.nonEmpty) { - lock synchronized { - // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by - // removing the deleted segment, we should force materialization of the iterator here, so that results of the - // iteration remain valid and deterministic. - val toDelete = segments.toList - reason.logReason(this, toDelete) - toDelete.foreach { segment => - this.segments.remove(segment.baseOffset) - } - deleteSegmentFiles(toDelete, asyncDelete) - } - } - } - - private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = { - Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition, - config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent) - } - private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = { Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org