kowshik commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r643446123
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1572,144 +1414,69 @@ class Log(@volatile private var _dir: File, .map(_.messageOffset) .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE) - roll(Some(rollOffset)) + localLog.roll(Some(rollOffset), Some(rollAction)) } else { segment } } /** - * Roll the log over to a new active segment starting with the current logEndOffset. + * Roll the local log over to a new active segment starting with the current logEndOffset. * This will trim the index to the exact size of the number of entries it currently contains. * * @return The newly rolled segment */ def roll(expectedNextOffset: Option[Long] = None): LogSegment = { - maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { - val start = time.hiResClockMs() - lock synchronized { - checkIfMemoryMappedBufferClosed() - val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) - val logFile = Log.logFile(dir, newOffset) - - if (segments.contains(newOffset)) { - // segment with the same base offset already exists and loaded - if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { - // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an - // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). - warn(s"Trying to roll a new log segment with start offset $newOffset " + - s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + - s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + - s" size of offset index: ${activeSegment.offsetIndex.entries}.") - removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, LogRoll) - } else { - throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + - s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + - s"segment is ${segments.get(newOffset)}.") - } - } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) { - throw new KafkaException( - s"Trying to roll a new log segment for topic partition $topicPartition with " + - s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") - } else { - val offsetIdxFile = offsetIndexFile(dir, newOffset) - val timeIdxFile = timeIndexFile(dir, newOffset) - val txnIdxFile = transactionIndexFile(dir, newOffset) - - for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { - warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") - Files.delete(file.toPath) - } - - segments.lastSegment.foreach(_.onBecomeInactiveSegment()) - } - - // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot - // offset align with the new segment offset since this ensures we can recover the segment by beginning - // with the corresponding snapshot file and scanning the segment data. Because the segment base offset - // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), - // we manually override the state offset here prior to taking the snapshot. - producerStateManager.updateMapEndOffset(newOffset) - producerStateManager.takeSnapshot() - - val segment = LogSegment.open(dir, - baseOffset = newOffset, - config, - time = time, - initFileSize = config.initFileSize, - preallocate = config.preallocate) - addSegment(segment) - - // We need to update the segment base offset and append position data of the metadata when log rolls. - // The next offset should not change. - updateLogEndOffset(nextOffsetMetadata.messageOffset) - - // schedule an asynchronous flush of the old segment - scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) - - info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") - - segment - } + lock synchronized { + localLog.roll(expectedNextOffset, Some(rollAction)) } } /** * The number of messages appended to the log since the last flush */ - private def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint + private def unflushedMessages: Long = logEndOffset - localLog.recoveryPoint /** - * Flush all log segments + * Flush all local log segments */ def flush(): Unit = flush(this.logEndOffset) /** - * Flush log segments for all offsets up to offset-1 + * Flush local log segments for all offsets up to offset-1 * * @param offset The offset to flush up to (non-inclusive); the new recovery point */ def flush(offset: Long): Unit = { maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { - if (offset > this.recoveryPoint) { + if (offset > localLog.recoveryPoint) { debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + s"unflushed: $unflushedMessages") - val segments = logSegments(this.recoveryPoint, offset) - segments.foreach(_.flush()) - // if there are any new segments, we need to flush the parent directory for crash consistency - segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath)) - + localLog.flush(offset) lock synchronized { - checkIfMemoryMappedBufferClosed() - if (offset > this.recoveryPoint) { - this.recoveryPoint = offset - lastFlushedTime.set(time.milliseconds) - } + localLog.markFlushed(offset) } } } } /** - * Completely delete this log directory and all contents from the file system with no delay + * Completely delete the local log directory and all contents from the file system with no delay */ private[log] def delete(): Unit = { maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { lock synchronized { - checkIfMemoryMappedBufferClosed() producerExpireCheck.cancel(true) - removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion) leaderEpochCache.foreach(_.clear()) - Utils.delete(dir) - // File handlers will be closed if this log is deleted - isMemoryMappedBufferClosed = true + val deletedSegments = localLog.delete() + deleteProducerSnapshotAsync(deletedSegments) Review comment: That's a good point. The best solution I could think of is to split the functionality up into few different APIs in `LocalLog`. For example, this is how the right implementation could look like in `Log.scala`: ``` // In `Log.scala`: private[log] def delete(): Unit = { maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { lock synchronized { producerExpireCheck.cancel(true) leaderEpochCache.foreach(_.clear()) val deletedSegments = localLog.deleteAllSegments() deleteProducerSnapshotAsync(deletedSegments) localLog.deleteDir() } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org