kowshik commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r607168537



##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -0,0 +1,423 @@
+package kafka.log
+
+import java.io.{File, IOException}
+import java.nio.file.{Files, NoSuchFileException}
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.log.Log.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, 
isIndexFile, isLogFile, offsetFromFile, offsetFromFileName}
+import kafka.server.{LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.InvalidOffsetException
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Seq, Set, mutable}
+
+class LogLoader(dir: File,

Review comment:
       Documentation

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2763,6 +2191,320 @@ object Log {
     appendInfo.append(batch, firstOffsetMetadata)
   }
 
+  def maybeCreateLeaderEpochCache(dir: File,
+                                  topicPartition: TopicPartition,
+                                  logDirFailureChannel: LogDirFailureChannel,
+                                  recordVersion: RecordVersion): 
Option[LeaderEpochFileCache] = {
+    val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
+
+    def newLeaderEpochFileCache(): LeaderEpochFileCache = {
+      val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, 
logDirFailureChannel)
+      new LeaderEpochFileCache(topicPartition, checkpointFile)
+    }
+
+    if (recordVersion.precedes(RecordVersion.V2)) {
+      val currentCache = if (leaderEpochFile.exists())
+        Some(newLeaderEpochFileCache())
+      else
+        None
+
+      if (currentCache.exists(_.nonEmpty))
+        warn(s"Deleting non-empty leader epoch cache due to incompatible 
message format $recordVersion")
+
+      Files.deleteIfExists(leaderEpochFile.toPath)
+      None
+    } else {
+      Some(newLeaderEpochFileCache())
+    }
+  }
+
+  /**
+   * Swap one or more new segment in place and delete one or more existing 
segments in a crash-safe manner. The old
+   * segments will be asynchronously deleted.
+   *
+   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
+   * or the caller will catch and handle IOException
+   *
+   * The sequence of operations is:
+   * <ol>
+   *   <li> Cleaner creates one or more new segments with suffix .cleaned and 
invokes replaceSegments().
+   *        If broker crashes at this point, the clean-and-swap operation is 
aborted and
+   *        the .cleaned files are deleted on recovery in loadSegments().
+   *   <li> New segments are renamed .swap. If the broker crashes before all 
segments were renamed to .swap, the
+   *        clean-and-swap operation is aborted - .cleaned as well as .swap 
files are deleted on recovery in
+   *        loadSegments(). We detect this situation by maintaining a specific 
order in which files are renamed from
+   *        .cleaned to .swap. Basically, files are renamed in descending 
order of offsets. On recovery, all .swap files
+   *        whose offset is greater than the minimum-offset .clean file are 
deleted.
+   *   <li> If the broker crashes after all new segments were renamed to 
.swap, the operation is completed, the swap
+   *        operation is resumed on recovery as described in the next step.
+   *   <li> Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.
+   *        If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().
+   *        replaceSegments() is then invoked to complete the swap with 
newSegment recreated from
+   *        the .swap file and oldSegments containing segments which were not 
renamed before the crash.
+   *   <li> Swap segment(s) are renamed to replace the existing segments, 
completing this operation.
+   *        If the broker crashes, any .deleted files which may be left behind 
are deleted
+   *        on recovery in loadSegments().
+   * </ol>
+   *
+   * @param existingSegments The existing segments of the log
+   * @param newSegments The new log segment to add to the log
+   * @param oldSegments The old log segments to delete from the log
+   * @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
+   */
+  private[log] def replaceSegments(existingSegments: LogSegments,
+                                   newSegments: Seq[LogSegment],
+                                   oldSegments: Seq[LogSegment],
+                                   isRecoveredSwapFile: Boolean = false,
+                                   dir: File,
+                                   topicPartition: TopicPartition,
+                                   config: LogConfig,
+                                   scheduler: Scheduler,
+                                   logDirFailureChannel: LogDirFailureChannel,
+                                   producerStateManager: 
ProducerStateManager): Unit = {
+      val sortedNewSegments = newSegments.sortBy(_.baseOffset)
+      // Some old segments may have been removed from index and scheduled for 
async deletion after the caller reads segments
+      // but before this method is executed. We want to filter out those 
segments to avoid calling asyncDeleteSegment()
+      // multiple times for the same segment.
+      val sortedOldSegments = oldSegments.filter(seg => 
existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset)
+
+      // need to do this in two phases to be crash safe AND do the delete 
asynchronously
+      // if we crash in the middle of this we complete the swap in 
loadSegments()
+      if (!isRecoveredSwapFile)
+        
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, 
Log.SwapFileSuffix))
+      sortedNewSegments.reverse.foreach(existingSegments.add(_))
+      val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
+
+      // delete the old files
+      sortedOldSegments.foreach { seg =>
+        // remove the index entry
+        if (seg.baseOffset != sortedNewSegments.head.baseOffset)
+          existingSegments.remove(seg.baseOffset)
+        // delete segment files, but do not delete producer state for segment 
objects which are being replaced.
+        deleteSegmentFiles(
+          List(seg),
+          asyncDelete = true,
+          deleteProducerStateSnapshots = 
!newSegmentBaseOffsets.contains(seg.baseOffset),
+          dir,
+          topicPartition,
+          config,
+          scheduler,
+          logDirFailureChannel,
+          producerStateManager)
+      }
+      // okay we are safe now, remove the swap suffix
+      sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+  }
+
+  /**
+   * Perform physical deletion for the given file. Allows the file to be 
deleted asynchronously or synchronously.
+   *
+   * This method assumes that the file exists and the method is not 
thread-safe.
+   *
+   * This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because
+   * it is either called before all logs are loaded or the caller will catch 
and handle IOException
+   *
+   * @throws IOException if the file can't be renamed and still exists
+   */
+  private[log] def deleteSegmentFiles(segments: Iterable[LogSegment],
+                                      asyncDelete: Boolean,
+                                      deleteProducerStateSnapshots: Boolean = 
true,
+                                      dir: File,
+                                      topicPartition: TopicPartition,
+                                      config: LogConfig,
+                                      scheduler: Scheduler,
+                                      logDirFailureChannel: 
LogDirFailureChannel,
+                                      producerStateManager: 
ProducerStateManager): Unit = {
+    segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
+
+    def deleteSegments(): Unit = {
+      info(s"Deleting segment files ${segments.mkString(",")}")
+      val parentDir = dir.getParent
+      maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while 
deleting segments for $topicPartition in dir $parentDir") {
+        segments.foreach { segment =>
+          segment.deleteIfExists()
+          if (deleteProducerStateSnapshots)
+            producerStateManager.removeAndDeleteSnapshot(segment.baseOffset)
+        }
+      }
+    }
+
+    if (asyncDelete)
+      scheduler.schedule("delete-file", () => deleteSegments(), delay = 
config.fileDeleteDelayMs)
+    else
+      deleteSegments()
+  }
+
+  private def maybeHandleIOException[T](logDirFailureChannel: 
LogDirFailureChannel,
+                                        parentDir: String,
+                                        msg: => String)(fun: => T): T = {
+    if (logDirFailureChannel.hasOfflineLogDir(parentDir)) {
+      throw new KafkaStorageException(s"The log dir $parentDir is offline due 
to a previous IO exception.")
+    }
+    try {
+      fun
+    } catch {
+      case e: IOException =>
+        logDirFailureChannel.maybeAddOfflineLogDir(parentDir, msg, e)
+        throw new KafkaStorageException(msg, e)
+    }
+  }
+
+  // Rebuild producer state until lastOffset. This method may be called from 
the recovery code path, and thus must be
+  // free of all side-effects, i.e. it must not update any log-specific state.
+  private[log] def rebuildProducerState(segments: LogSegments,

Review comment:
       Document the params

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2763,6 +2191,320 @@ object Log {
     appendInfo.append(batch, firstOffsetMetadata)
   }
 
+  def maybeCreateLeaderEpochCache(dir: File,
+                                  topicPartition: TopicPartition,
+                                  logDirFailureChannel: LogDirFailureChannel,
+                                  recordVersion: RecordVersion): 
Option[LeaderEpochFileCache] = {
+    val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
+
+    def newLeaderEpochFileCache(): LeaderEpochFileCache = {
+      val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, 
logDirFailureChannel)
+      new LeaderEpochFileCache(topicPartition, checkpointFile)
+    }
+
+    if (recordVersion.precedes(RecordVersion.V2)) {
+      val currentCache = if (leaderEpochFile.exists())
+        Some(newLeaderEpochFileCache())
+      else
+        None
+
+      if (currentCache.exists(_.nonEmpty))
+        warn(s"Deleting non-empty leader epoch cache due to incompatible 
message format $recordVersion")
+
+      Files.deleteIfExists(leaderEpochFile.toPath)
+      None
+    } else {
+      Some(newLeaderEpochFileCache())
+    }
+  }
+
+  /**
+   * Swap one or more new segment in place and delete one or more existing 
segments in a crash-safe manner. The old
+   * segments will be asynchronously deleted.
+   *
+   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
+   * or the caller will catch and handle IOException
+   *
+   * The sequence of operations is:
+   * <ol>
+   *   <li> Cleaner creates one or more new segments with suffix .cleaned and 
invokes replaceSegments().
+   *        If broker crashes at this point, the clean-and-swap operation is 
aborted and
+   *        the .cleaned files are deleted on recovery in loadSegments().
+   *   <li> New segments are renamed .swap. If the broker crashes before all 
segments were renamed to .swap, the
+   *        clean-and-swap operation is aborted - .cleaned as well as .swap 
files are deleted on recovery in
+   *        loadSegments(). We detect this situation by maintaining a specific 
order in which files are renamed from
+   *        .cleaned to .swap. Basically, files are renamed in descending 
order of offsets. On recovery, all .swap files
+   *        whose offset is greater than the minimum-offset .clean file are 
deleted.
+   *   <li> If the broker crashes after all new segments were renamed to 
.swap, the operation is completed, the swap
+   *        operation is resumed on recovery as described in the next step.
+   *   <li> Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.
+   *        If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().
+   *        replaceSegments() is then invoked to complete the swap with 
newSegment recreated from
+   *        the .swap file and oldSegments containing segments which were not 
renamed before the crash.
+   *   <li> Swap segment(s) are renamed to replace the existing segments, 
completing this operation.
+   *        If the broker crashes, any .deleted files which may be left behind 
are deleted
+   *        on recovery in loadSegments().
+   * </ol>
+   *
+   * @param existingSegments The existing segments of the log
+   * @param newSegments The new log segment to add to the log
+   * @param oldSegments The old log segments to delete from the log
+   * @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
+   */
+  private[log] def replaceSegments(existingSegments: LogSegments,
+                                   newSegments: Seq[LogSegment],
+                                   oldSegments: Seq[LogSegment],
+                                   isRecoveredSwapFile: Boolean = false,
+                                   dir: File,
+                                   topicPartition: TopicPartition,
+                                   config: LogConfig,
+                                   scheduler: Scheduler,
+                                   logDirFailureChannel: LogDirFailureChannel,
+                                   producerStateManager: 
ProducerStateManager): Unit = {
+      val sortedNewSegments = newSegments.sortBy(_.baseOffset)
+      // Some old segments may have been removed from index and scheduled for 
async deletion after the caller reads segments
+      // but before this method is executed. We want to filter out those 
segments to avoid calling asyncDeleteSegment()
+      // multiple times for the same segment.
+      val sortedOldSegments = oldSegments.filter(seg => 
existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset)
+
+      // need to do this in two phases to be crash safe AND do the delete 
asynchronously
+      // if we crash in the middle of this we complete the swap in 
loadSegments()
+      if (!isRecoveredSwapFile)
+        
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, 
Log.SwapFileSuffix))
+      sortedNewSegments.reverse.foreach(existingSegments.add(_))
+      val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
+
+      // delete the old files
+      sortedOldSegments.foreach { seg =>
+        // remove the index entry
+        if (seg.baseOffset != sortedNewSegments.head.baseOffset)
+          existingSegments.remove(seg.baseOffset)
+        // delete segment files, but do not delete producer state for segment 
objects which are being replaced.
+        deleteSegmentFiles(
+          List(seg),
+          asyncDelete = true,
+          deleteProducerStateSnapshots = 
!newSegmentBaseOffsets.contains(seg.baseOffset),
+          dir,
+          topicPartition,
+          config,
+          scheduler,
+          logDirFailureChannel,
+          producerStateManager)
+      }
+      // okay we are safe now, remove the swap suffix
+      sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+  }
+
+  /**
+   * Perform physical deletion for the given file. Allows the file to be 
deleted asynchronously or synchronously.
+   *
+   * This method assumes that the file exists and the method is not 
thread-safe.
+   *
+   * This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because
+   * it is either called before all logs are loaded or the caller will catch 
and handle IOException
+   *
+   * @throws IOException if the file can't be renamed and still exists
+   */
+  private[log] def deleteSegmentFiles(segments: Iterable[LogSegment],
+                                      asyncDelete: Boolean,
+                                      deleteProducerStateSnapshots: Boolean = 
true,
+                                      dir: File,
+                                      topicPartition: TopicPartition,
+                                      config: LogConfig,
+                                      scheduler: Scheduler,
+                                      logDirFailureChannel: 
LogDirFailureChannel,
+                                      producerStateManager: 
ProducerStateManager): Unit = {
+    segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
+
+    def deleteSegments(): Unit = {
+      info(s"Deleting segment files ${segments.mkString(",")}")
+      val parentDir = dir.getParent
+      maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while 
deleting segments for $topicPartition in dir $parentDir") {
+        segments.foreach { segment =>
+          segment.deleteIfExists()
+          if (deleteProducerStateSnapshots)
+            producerStateManager.removeAndDeleteSnapshot(segment.baseOffset)
+        }
+      }
+    }
+
+    if (asyncDelete)
+      scheduler.schedule("delete-file", () => deleteSegments(), delay = 
config.fileDeleteDelayMs)
+    else
+      deleteSegments()
+  }
+
+  private def maybeHandleIOException[T](logDirFailureChannel: 
LogDirFailureChannel,
+                                        parentDir: String,
+                                        msg: => String)(fun: => T): T = {
+    if (logDirFailureChannel.hasOfflineLogDir(parentDir)) {
+      throw new KafkaStorageException(s"The log dir $parentDir is offline due 
to a previous IO exception.")
+    }
+    try {
+      fun
+    } catch {
+      case e: IOException =>
+        logDirFailureChannel.maybeAddOfflineLogDir(parentDir, msg, e)
+        throw new KafkaStorageException(msg, e)
+    }
+  }
+
+  // Rebuild producer state until lastOffset. This method may be called from 
the recovery code path, and thus must be
+  // free of all side-effects, i.e. it must not update any log-specific state.
+  private[log] def rebuildProducerState(segments: LogSegments,
+                                        logStartOffset: Long,
+                                        lastOffset: Long,
+                                        recordVersion: RecordVersion,
+                                        time: Time,
+                                        reloadFromCleanShutdown: Boolean,
+                                        producerStateManager: 
ProducerStateManager): Unit = {
+    val allSegments = segments.values
+    val offsetsToSnapshot =
+      if (allSegments.nonEmpty) {
+        val nextLatestSegmentBaseOffset = 
segments.lowerSegment(allSegments.last.baseOffset).map(_.baseOffset)
+        Seq(nextLatestSegmentBaseOffset, Some(allSegments.last.baseOffset), 
Some(lastOffset))
+      } else {
+        Seq(Some(lastOffset))
+      }
+    info(s"Loading producer state till offset $lastOffset with message format 
version ${recordVersion.value}")
+
+    // We want to avoid unnecessary scanning of the log to build the producer 
state when the broker is being
+    // upgraded. The basic idea is to use the absence of producer snapshot 
files to detect the upgrade case,
+    // but we have to be careful not to assume too much in the presence of 
broker failures. The two most common
+    // upgrade cases in which we expect to find no snapshots are the following:
+    //
+    // 1. The broker has been upgraded, but the topic is still on the old 
message format.
+    // 2. The broker has been upgraded, the topic is on the new message 
format, and we had a clean shutdown.
+    //
+    // If we hit either of these cases, we skip producer state loading and 
write a new snapshot at the log end
+    // offset (see below). The next time the log is reloaded, we will load 
producer state using this snapshot
+    // (or later snapshots). Otherwise, if there is no snapshot file, then we 
have to rebuild producer state
+    // from the first segment.
+    if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
+      (producerStateManager.latestSnapshotOffset.isEmpty && 
reloadFromCleanShutdown)) {
+      // To avoid an expensive scan through all of the segments, we take empty 
snapshots from the start of the
+      // last two segments and the last offset. This should avoid the full 
scan in the case that the log needs
+      // truncation.
+      offsetsToSnapshot.flatten.foreach { offset =>
+        producerStateManager.updateMapEndOffset(offset)
+        producerStateManager.takeSnapshot()
+      }
+    } else {
+      info(s"Reloading from producer snapshot and rebuilding producer state 
from offset $lastOffset")
+      val isEmptyBeforeTruncation = producerStateManager.isEmpty && 
producerStateManager.mapEndOffset >= lastOffset
+      val producerStateLoadStart = time.milliseconds()
+      producerStateManager.truncateAndReload(logStartOffset, lastOffset, 
time.milliseconds())
+      val segmentRecoveryStart = time.milliseconds()
+
+      // Only do the potentially expensive reloading if the last snapshot 
offset is lower than the log end
+      // offset (which would be the case on first startup) and there were 
active producers prior to truncation
+      // (which could be the case if truncating after initial loading). If 
there weren't, then truncating
+      // shouldn't change that fact (although it could cause a producerId to 
expire earlier than expected),
+      // and we can skip the loading. This is an optimization for users which 
are not yet using
+      // idempotent/transactional features yet.
+      if (lastOffset > producerStateManager.mapEndOffset && 
!isEmptyBeforeTruncation) {
+        val segmentOfLastOffset = segments.floorSegment(lastOffset)
+
+        segments.values(producerStateManager.mapEndOffset, lastOffset).foreach 
{ segment =>
+          val startOffset = Utils.max(segment.baseOffset, 
producerStateManager.mapEndOffset, logStartOffset)
+          producerStateManager.updateMapEndOffset(startOffset)
+
+          if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
+            producerStateManager.takeSnapshot()
+
+          val maxPosition = if (segmentOfLastOffset.contains(segment)) {
+            Option(segment.translateOffset(lastOffset))
+              .map(_.position)
+              .getOrElse(segment.size)
+          } else {
+            segment.size
+          }
+
+          val fetchDataInfo = segment.read(startOffset,
+            maxSize = Int.MaxValue,
+            maxPosition = maxPosition,
+            minOneMessage = false)
+          if (fetchDataInfo != null)
+            loadProducersFromRecords(producerStateManager, 
fetchDataInfo.records)
+        }
+      }
+      producerStateManager.updateMapEndOffset(lastOffset)
+      producerStateManager.takeSnapshot()
+      info(s"Producer state recovery took ${producerStateLoadStart - 
segmentRecoveryStart}ms for snapshot load " +
+        s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment 
recovery from offset $lastOffset")
+    }
+  }
+
+  /**
+   * Split a segment into one or more segments such that there is no offset 
overflow in any of them. The
+   * resulting segments will contain the exact same messages that are present 
in the input segment. On successful
+   * completion of this method, the input segment will be deleted and will be 
replaced by the resulting new segments.
+   * See replaceSegments for recovery logic, in case the broker dies in the 
middle of this operation.
+   * <p>Note that this method assumes we have already determined that the 
segment passed in contains records that cause
+   * offset overflow.</p>
+   * <p>The split logic overloads the use of .clean files that LogCleaner 
typically uses to make the process of replacing
+   * the input segment with multiple new segments atomic and recoverable in 
the event of a crash. See replaceSegments
+   * and completeSwapOperations for the implementation to make this operation 
recoverable on crashes.</p>
+   * @param segment Segment to split
+   * @return List of new segments that replace the input segment
+   */
+  private[log] def splitOverflowedSegment(segment: LogSegment,

Review comment:
       Document the params

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2763,6 +2191,320 @@ object Log {
     appendInfo.append(batch, firstOffsetMetadata)
   }
 
+  def maybeCreateLeaderEpochCache(dir: File,
+                                  topicPartition: TopicPartition,
+                                  logDirFailureChannel: LogDirFailureChannel,
+                                  recordVersion: RecordVersion): 
Option[LeaderEpochFileCache] = {
+    val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
+
+    def newLeaderEpochFileCache(): LeaderEpochFileCache = {
+      val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, 
logDirFailureChannel)
+      new LeaderEpochFileCache(topicPartition, checkpointFile)
+    }
+
+    if (recordVersion.precedes(RecordVersion.V2)) {
+      val currentCache = if (leaderEpochFile.exists())
+        Some(newLeaderEpochFileCache())
+      else
+        None
+
+      if (currentCache.exists(_.nonEmpty))
+        warn(s"Deleting non-empty leader epoch cache due to incompatible 
message format $recordVersion")
+
+      Files.deleteIfExists(leaderEpochFile.toPath)
+      None
+    } else {
+      Some(newLeaderEpochFileCache())
+    }
+  }
+
+  /**
+   * Swap one or more new segment in place and delete one or more existing 
segments in a crash-safe manner. The old
+   * segments will be asynchronously deleted.
+   *
+   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
+   * or the caller will catch and handle IOException
+   *
+   * The sequence of operations is:
+   * <ol>
+   *   <li> Cleaner creates one or more new segments with suffix .cleaned and 
invokes replaceSegments().
+   *        If broker crashes at this point, the clean-and-swap operation is 
aborted and
+   *        the .cleaned files are deleted on recovery in loadSegments().
+   *   <li> New segments are renamed .swap. If the broker crashes before all 
segments were renamed to .swap, the
+   *        clean-and-swap operation is aborted - .cleaned as well as .swap 
files are deleted on recovery in
+   *        loadSegments(). We detect this situation by maintaining a specific 
order in which files are renamed from
+   *        .cleaned to .swap. Basically, files are renamed in descending 
order of offsets. On recovery, all .swap files
+   *        whose offset is greater than the minimum-offset .clean file are 
deleted.
+   *   <li> If the broker crashes after all new segments were renamed to 
.swap, the operation is completed, the swap
+   *        operation is resumed on recovery as described in the next step.
+   *   <li> Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.
+   *        If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().
+   *        replaceSegments() is then invoked to complete the swap with 
newSegment recreated from
+   *        the .swap file and oldSegments containing segments which were not 
renamed before the crash.
+   *   <li> Swap segment(s) are renamed to replace the existing segments, 
completing this operation.
+   *        If the broker crashes, any .deleted files which may be left behind 
are deleted
+   *        on recovery in loadSegments().
+   * </ol>
+   *
+   * @param existingSegments The existing segments of the log
+   * @param newSegments The new log segment to add to the log
+   * @param oldSegments The old log segments to delete from the log
+   * @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
+   */
+  private[log] def replaceSegments(existingSegments: LogSegments,

Review comment:
       Document the params 

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2566,10 +1979,18 @@ object Log {
             logDirFailureChannel: LogDirFailureChannel,
             lastShutdownClean: Boolean = true,
             keepPartitionMetadataFile: Boolean = true): Log = {
+    // create the log directory if it doesn't exist
+    Files.createDirectories(dir.toPath)
     val topicPartition = Log.parseTopicPartitionName(dir)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, 
topicPartition, logDirFailureChannel,
+      config.messageFormatVersion.recordVersion)
     val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
-    new Log(dir, config, logStartOffset, recoveryPoint, scheduler, 
brokerTopicStats, time, maxProducerIdExpirationMs,
-      producerIdExpirationCheckIntervalMs, topicPartition, 
producerStateManager, logDirFailureChannel, lastShutdownClean, 
keepPartitionMetadataFile)
+    val logLoader = new LogLoader(dir, config, logStartOffset, recoveryPoint, 
scheduler, time, maxProducerIdExpirationMs,

Review comment:
       Should the LogLoader also instantiate leaderEpochCache and 
producerStateManager?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2763,6 +2191,320 @@ object Log {
     appendInfo.append(batch, firstOffsetMetadata)
   }
 
+  def maybeCreateLeaderEpochCache(dir: File,
+                                  topicPartition: TopicPartition,
+                                  logDirFailureChannel: LogDirFailureChannel,
+                                  recordVersion: RecordVersion): 
Option[LeaderEpochFileCache] = {
+    val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
+
+    def newLeaderEpochFileCache(): LeaderEpochFileCache = {
+      val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, 
logDirFailureChannel)
+      new LeaderEpochFileCache(topicPartition, checkpointFile)
+    }
+
+    if (recordVersion.precedes(RecordVersion.V2)) {
+      val currentCache = if (leaderEpochFile.exists())
+        Some(newLeaderEpochFileCache())
+      else
+        None
+
+      if (currentCache.exists(_.nonEmpty))
+        warn(s"Deleting non-empty leader epoch cache due to incompatible 
message format $recordVersion")
+
+      Files.deleteIfExists(leaderEpochFile.toPath)
+      None
+    } else {
+      Some(newLeaderEpochFileCache())
+    }
+  }
+
+  /**
+   * Swap one or more new segment in place and delete one or more existing 
segments in a crash-safe manner. The old
+   * segments will be asynchronously deleted.
+   *
+   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
+   * or the caller will catch and handle IOException
+   *
+   * The sequence of operations is:
+   * <ol>
+   *   <li> Cleaner creates one or more new segments with suffix .cleaned and 
invokes replaceSegments().
+   *        If broker crashes at this point, the clean-and-swap operation is 
aborted and
+   *        the .cleaned files are deleted on recovery in loadSegments().
+   *   <li> New segments are renamed .swap. If the broker crashes before all 
segments were renamed to .swap, the
+   *        clean-and-swap operation is aborted - .cleaned as well as .swap 
files are deleted on recovery in
+   *        loadSegments(). We detect this situation by maintaining a specific 
order in which files are renamed from
+   *        .cleaned to .swap. Basically, files are renamed in descending 
order of offsets. On recovery, all .swap files
+   *        whose offset is greater than the minimum-offset .clean file are 
deleted.
+   *   <li> If the broker crashes after all new segments were renamed to 
.swap, the operation is completed, the swap
+   *        operation is resumed on recovery as described in the next step.
+   *   <li> Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.
+   *        If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().
+   *        replaceSegments() is then invoked to complete the swap with 
newSegment recreated from
+   *        the .swap file and oldSegments containing segments which were not 
renamed before the crash.
+   *   <li> Swap segment(s) are renamed to replace the existing segments, 
completing this operation.
+   *        If the broker crashes, any .deleted files which may be left behind 
are deleted
+   *        on recovery in loadSegments().
+   * </ol>
+   *
+   * @param existingSegments The existing segments of the log
+   * @param newSegments The new log segment to add to the log
+   * @param oldSegments The old log segments to delete from the log
+   * @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
+   */
+  private[log] def replaceSegments(existingSegments: LogSegments,
+                                   newSegments: Seq[LogSegment],
+                                   oldSegments: Seq[LogSegment],
+                                   isRecoveredSwapFile: Boolean = false,
+                                   dir: File,
+                                   topicPartition: TopicPartition,
+                                   config: LogConfig,
+                                   scheduler: Scheduler,
+                                   logDirFailureChannel: LogDirFailureChannel,
+                                   producerStateManager: 
ProducerStateManager): Unit = {
+      val sortedNewSegments = newSegments.sortBy(_.baseOffset)
+      // Some old segments may have been removed from index and scheduled for 
async deletion after the caller reads segments
+      // but before this method is executed. We want to filter out those 
segments to avoid calling asyncDeleteSegment()
+      // multiple times for the same segment.
+      val sortedOldSegments = oldSegments.filter(seg => 
existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset)
+
+      // need to do this in two phases to be crash safe AND do the delete 
asynchronously
+      // if we crash in the middle of this we complete the swap in 
loadSegments()
+      if (!isRecoveredSwapFile)
+        
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, 
Log.SwapFileSuffix))
+      sortedNewSegments.reverse.foreach(existingSegments.add(_))
+      val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
+
+      // delete the old files
+      sortedOldSegments.foreach { seg =>
+        // remove the index entry
+        if (seg.baseOffset != sortedNewSegments.head.baseOffset)
+          existingSegments.remove(seg.baseOffset)
+        // delete segment files, but do not delete producer state for segment 
objects which are being replaced.
+        deleteSegmentFiles(
+          List(seg),
+          asyncDelete = true,
+          deleteProducerStateSnapshots = 
!newSegmentBaseOffsets.contains(seg.baseOffset),
+          dir,
+          topicPartition,
+          config,
+          scheduler,
+          logDirFailureChannel,
+          producerStateManager)
+      }
+      // okay we are safe now, remove the swap suffix
+      sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+  }
+
+  /**
+   * Perform physical deletion for the given file. Allows the file to be 
deleted asynchronously or synchronously.
+   *
+   * This method assumes that the file exists and the method is not 
thread-safe.
+   *
+   * This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because
+   * it is either called before all logs are loaded or the caller will catch 
and handle IOException
+   *
+   * @throws IOException if the file can't be renamed and still exists
+   */
+  private[log] def deleteSegmentFiles(segments: Iterable[LogSegment],

Review comment:
       Document the params




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to