kowshik commented on a change in pull request #10478: URL: https://github.com/apache/kafka/pull/10478#discussion_r607168537
########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -0,0 +1,423 @@ +package kafka.log + +import java.io.{File, IOException} +import java.nio.file.{Files, NoSuchFileException} + +import kafka.common.LogSegmentOffsetOverflowException +import kafka.log.Log.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile, offsetFromFileName} +import kafka.server.{LogDirFailureChannel, LogOffsetMetadata} +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.{CoreUtils, Logging, Scheduler} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.InvalidOffsetException +import org.apache.kafka.common.utils.Time + +import scala.collection.{Seq, Set, mutable} + +class LogLoader(dir: File, Review comment: Documentation ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2763,6 +2191,320 @@ object Log { appendInfo.append(batch, firstOffsetMetadata) } + def maybeCreateLeaderEpochCache(dir: File, + topicPartition: TopicPartition, + logDirFailureChannel: LogDirFailureChannel, + recordVersion: RecordVersion): Option[LeaderEpochFileCache] = { + val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) + + def newLeaderEpochFileCache(): LeaderEpochFileCache = { + val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) + new LeaderEpochFileCache(topicPartition, checkpointFile) + } + + if (recordVersion.precedes(RecordVersion.V2)) { + val currentCache = if (leaderEpochFile.exists()) + Some(newLeaderEpochFileCache()) + else + None + + if (currentCache.exists(_.nonEmpty)) + warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") + + Files.deleteIfExists(leaderEpochFile.toPath) + None + } else { + Some(newLeaderEpochFileCache()) + } + } + + /** + * Swap one or more new segment in place and delete one or more existing segments in a crash-safe manner. The old + * segments will be asynchronously deleted. + * + * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded + * or the caller will catch and handle IOException + * + * The sequence of operations is: + * <ol> + * <li> Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). + * If broker crashes at this point, the clean-and-swap operation is aborted and + * the .cleaned files are deleted on recovery in loadSegments(). + * <li> New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the + * clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in + * loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from + * .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files + * whose offset is greater than the minimum-offset .clean file are deleted. + * <li> If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap + * operation is resumed on recovery as described in the next step. + * <li> Old segment files are renamed to .deleted and asynchronous delete is scheduled. + * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). + * replaceSegments() is then invoked to complete the swap with newSegment recreated from + * the .swap file and oldSegments containing segments which were not renamed before the crash. + * <li> Swap segment(s) are renamed to replace the existing segments, completing this operation. + * If the broker crashes, any .deleted files which may be left behind are deleted + * on recovery in loadSegments(). + * </ol> + * + * @param existingSegments The existing segments of the log + * @param newSegments The new log segment to add to the log + * @param oldSegments The old log segments to delete from the log + * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash + */ + private[log] def replaceSegments(existingSegments: LogSegments, + newSegments: Seq[LogSegment], + oldSegments: Seq[LogSegment], + isRecoveredSwapFile: Boolean = false, + dir: File, + topicPartition: TopicPartition, + config: LogConfig, + scheduler: Scheduler, + logDirFailureChannel: LogDirFailureChannel, + producerStateManager: ProducerStateManager): Unit = { + val sortedNewSegments = newSegments.sortBy(_.baseOffset) + // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments + // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment() + // multiple times for the same segment. + val sortedOldSegments = oldSegments.filter(seg => existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset) + + // need to do this in two phases to be crash safe AND do the delete asynchronously + // if we crash in the middle of this we complete the swap in loadSegments() + if (!isRecoveredSwapFile) + sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) + sortedNewSegments.reverse.foreach(existingSegments.add(_)) + val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet + + // delete the old files + sortedOldSegments.foreach { seg => + // remove the index entry + if (seg.baseOffset != sortedNewSegments.head.baseOffset) + existingSegments.remove(seg.baseOffset) + // delete segment files, but do not delete producer state for segment objects which are being replaced. + deleteSegmentFiles( + List(seg), + asyncDelete = true, + deleteProducerStateSnapshots = !newSegmentBaseOffsets.contains(seg.baseOffset), + dir, + topicPartition, + config, + scheduler, + logDirFailureChannel, + producerStateManager) + } + // okay we are safe now, remove the swap suffix + sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + } + + /** + * Perform physical deletion for the given file. Allows the file to be deleted asynchronously or synchronously. + * + * This method assumes that the file exists and the method is not thread-safe. + * + * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because + * it is either called before all logs are loaded or the caller will catch and handle IOException + * + * @throws IOException if the file can't be renamed and still exists + */ + private[log] def deleteSegmentFiles(segments: Iterable[LogSegment], + asyncDelete: Boolean, + deleteProducerStateSnapshots: Boolean = true, + dir: File, + topicPartition: TopicPartition, + config: LogConfig, + scheduler: Scheduler, + logDirFailureChannel: LogDirFailureChannel, + producerStateManager: ProducerStateManager): Unit = { + segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix)) + + def deleteSegments(): Unit = { + info(s"Deleting segment files ${segments.mkString(",")}") + val parentDir = dir.getParent + maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") { + segments.foreach { segment => + segment.deleteIfExists() + if (deleteProducerStateSnapshots) + producerStateManager.removeAndDeleteSnapshot(segment.baseOffset) + } + } + } + + if (asyncDelete) + scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs) + else + deleteSegments() + } + + private def maybeHandleIOException[T](logDirFailureChannel: LogDirFailureChannel, + parentDir: String, + msg: => String)(fun: => T): T = { + if (logDirFailureChannel.hasOfflineLogDir(parentDir)) { + throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.") + } + try { + fun + } catch { + case e: IOException => + logDirFailureChannel.maybeAddOfflineLogDir(parentDir, msg, e) + throw new KafkaStorageException(msg, e) + } + } + + // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be + // free of all side-effects, i.e. it must not update any log-specific state. + private[log] def rebuildProducerState(segments: LogSegments, Review comment: Document the params ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2763,6 +2191,320 @@ object Log { appendInfo.append(batch, firstOffsetMetadata) } + def maybeCreateLeaderEpochCache(dir: File, + topicPartition: TopicPartition, + logDirFailureChannel: LogDirFailureChannel, + recordVersion: RecordVersion): Option[LeaderEpochFileCache] = { + val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) + + def newLeaderEpochFileCache(): LeaderEpochFileCache = { + val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) + new LeaderEpochFileCache(topicPartition, checkpointFile) + } + + if (recordVersion.precedes(RecordVersion.V2)) { + val currentCache = if (leaderEpochFile.exists()) + Some(newLeaderEpochFileCache()) + else + None + + if (currentCache.exists(_.nonEmpty)) + warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") + + Files.deleteIfExists(leaderEpochFile.toPath) + None + } else { + Some(newLeaderEpochFileCache()) + } + } + + /** + * Swap one or more new segment in place and delete one or more existing segments in a crash-safe manner. The old + * segments will be asynchronously deleted. + * + * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded + * or the caller will catch and handle IOException + * + * The sequence of operations is: + * <ol> + * <li> Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). + * If broker crashes at this point, the clean-and-swap operation is aborted and + * the .cleaned files are deleted on recovery in loadSegments(). + * <li> New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the + * clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in + * loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from + * .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files + * whose offset is greater than the minimum-offset .clean file are deleted. + * <li> If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap + * operation is resumed on recovery as described in the next step. + * <li> Old segment files are renamed to .deleted and asynchronous delete is scheduled. + * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). + * replaceSegments() is then invoked to complete the swap with newSegment recreated from + * the .swap file and oldSegments containing segments which were not renamed before the crash. + * <li> Swap segment(s) are renamed to replace the existing segments, completing this operation. + * If the broker crashes, any .deleted files which may be left behind are deleted + * on recovery in loadSegments(). + * </ol> + * + * @param existingSegments The existing segments of the log + * @param newSegments The new log segment to add to the log + * @param oldSegments The old log segments to delete from the log + * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash + */ + private[log] def replaceSegments(existingSegments: LogSegments, + newSegments: Seq[LogSegment], + oldSegments: Seq[LogSegment], + isRecoveredSwapFile: Boolean = false, + dir: File, + topicPartition: TopicPartition, + config: LogConfig, + scheduler: Scheduler, + logDirFailureChannel: LogDirFailureChannel, + producerStateManager: ProducerStateManager): Unit = { + val sortedNewSegments = newSegments.sortBy(_.baseOffset) + // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments + // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment() + // multiple times for the same segment. + val sortedOldSegments = oldSegments.filter(seg => existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset) + + // need to do this in two phases to be crash safe AND do the delete asynchronously + // if we crash in the middle of this we complete the swap in loadSegments() + if (!isRecoveredSwapFile) + sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) + sortedNewSegments.reverse.foreach(existingSegments.add(_)) + val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet + + // delete the old files + sortedOldSegments.foreach { seg => + // remove the index entry + if (seg.baseOffset != sortedNewSegments.head.baseOffset) + existingSegments.remove(seg.baseOffset) + // delete segment files, but do not delete producer state for segment objects which are being replaced. + deleteSegmentFiles( + List(seg), + asyncDelete = true, + deleteProducerStateSnapshots = !newSegmentBaseOffsets.contains(seg.baseOffset), + dir, + topicPartition, + config, + scheduler, + logDirFailureChannel, + producerStateManager) + } + // okay we are safe now, remove the swap suffix + sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + } + + /** + * Perform physical deletion for the given file. Allows the file to be deleted asynchronously or synchronously. + * + * This method assumes that the file exists and the method is not thread-safe. + * + * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because + * it is either called before all logs are loaded or the caller will catch and handle IOException + * + * @throws IOException if the file can't be renamed and still exists + */ + private[log] def deleteSegmentFiles(segments: Iterable[LogSegment], + asyncDelete: Boolean, + deleteProducerStateSnapshots: Boolean = true, + dir: File, + topicPartition: TopicPartition, + config: LogConfig, + scheduler: Scheduler, + logDirFailureChannel: LogDirFailureChannel, + producerStateManager: ProducerStateManager): Unit = { + segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix)) + + def deleteSegments(): Unit = { + info(s"Deleting segment files ${segments.mkString(",")}") + val parentDir = dir.getParent + maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") { + segments.foreach { segment => + segment.deleteIfExists() + if (deleteProducerStateSnapshots) + producerStateManager.removeAndDeleteSnapshot(segment.baseOffset) + } + } + } + + if (asyncDelete) + scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs) + else + deleteSegments() + } + + private def maybeHandleIOException[T](logDirFailureChannel: LogDirFailureChannel, + parentDir: String, + msg: => String)(fun: => T): T = { + if (logDirFailureChannel.hasOfflineLogDir(parentDir)) { + throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.") + } + try { + fun + } catch { + case e: IOException => + logDirFailureChannel.maybeAddOfflineLogDir(parentDir, msg, e) + throw new KafkaStorageException(msg, e) + } + } + + // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be + // free of all side-effects, i.e. it must not update any log-specific state. + private[log] def rebuildProducerState(segments: LogSegments, + logStartOffset: Long, + lastOffset: Long, + recordVersion: RecordVersion, + time: Time, + reloadFromCleanShutdown: Boolean, + producerStateManager: ProducerStateManager): Unit = { + val allSegments = segments.values + val offsetsToSnapshot = + if (allSegments.nonEmpty) { + val nextLatestSegmentBaseOffset = segments.lowerSegment(allSegments.last.baseOffset).map(_.baseOffset) + Seq(nextLatestSegmentBaseOffset, Some(allSegments.last.baseOffset), Some(lastOffset)) + } else { + Seq(Some(lastOffset)) + } + info(s"Loading producer state till offset $lastOffset with message format version ${recordVersion.value}") + + // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being + // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case, + // but we have to be careful not to assume too much in the presence of broker failures. The two most common + // upgrade cases in which we expect to find no snapshots are the following: + // + // 1. The broker has been upgraded, but the topic is still on the old message format. + // 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown. + // + // If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end + // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot + // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state + // from the first segment. + if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 || + (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { + // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the + // last two segments and the last offset. This should avoid the full scan in the case that the log needs + // truncation. + offsetsToSnapshot.flatten.foreach { offset => + producerStateManager.updateMapEndOffset(offset) + producerStateManager.takeSnapshot() + } + } else { + info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset") + val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset + val producerStateLoadStart = time.milliseconds() + producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) + val segmentRecoveryStart = time.milliseconds() + + // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end + // offset (which would be the case on first startup) and there were active producers prior to truncation + // (which could be the case if truncating after initial loading). If there weren't, then truncating + // shouldn't change that fact (although it could cause a producerId to expire earlier than expected), + // and we can skip the loading. This is an optimization for users which are not yet using + // idempotent/transactional features yet. + if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) { + val segmentOfLastOffset = segments.floorSegment(lastOffset) + + segments.values(producerStateManager.mapEndOffset, lastOffset).foreach { segment => + val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) + producerStateManager.updateMapEndOffset(startOffset) + + if (offsetsToSnapshot.contains(Some(segment.baseOffset))) + producerStateManager.takeSnapshot() + + val maxPosition = if (segmentOfLastOffset.contains(segment)) { + Option(segment.translateOffset(lastOffset)) + .map(_.position) + .getOrElse(segment.size) + } else { + segment.size + } + + val fetchDataInfo = segment.read(startOffset, + maxSize = Int.MaxValue, + maxPosition = maxPosition, + minOneMessage = false) + if (fetchDataInfo != null) + loadProducersFromRecords(producerStateManager, fetchDataInfo.records) + } + } + producerStateManager.updateMapEndOffset(lastOffset) + producerStateManager.takeSnapshot() + info(s"Producer state recovery took ${producerStateLoadStart - segmentRecoveryStart}ms for snapshot load " + + s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset") + } + } + + /** + * Split a segment into one or more segments such that there is no offset overflow in any of them. The + * resulting segments will contain the exact same messages that are present in the input segment. On successful + * completion of this method, the input segment will be deleted and will be replaced by the resulting new segments. + * See replaceSegments for recovery logic, in case the broker dies in the middle of this operation. + * <p>Note that this method assumes we have already determined that the segment passed in contains records that cause + * offset overflow.</p> + * <p>The split logic overloads the use of .clean files that LogCleaner typically uses to make the process of replacing + * the input segment with multiple new segments atomic and recoverable in the event of a crash. See replaceSegments + * and completeSwapOperations for the implementation to make this operation recoverable on crashes.</p> + * @param segment Segment to split + * @return List of new segments that replace the input segment + */ + private[log] def splitOverflowedSegment(segment: LogSegment, Review comment: Document the params ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2763,6 +2191,320 @@ object Log { appendInfo.append(batch, firstOffsetMetadata) } + def maybeCreateLeaderEpochCache(dir: File, + topicPartition: TopicPartition, + logDirFailureChannel: LogDirFailureChannel, + recordVersion: RecordVersion): Option[LeaderEpochFileCache] = { + val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) + + def newLeaderEpochFileCache(): LeaderEpochFileCache = { + val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) + new LeaderEpochFileCache(topicPartition, checkpointFile) + } + + if (recordVersion.precedes(RecordVersion.V2)) { + val currentCache = if (leaderEpochFile.exists()) + Some(newLeaderEpochFileCache()) + else + None + + if (currentCache.exists(_.nonEmpty)) + warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") + + Files.deleteIfExists(leaderEpochFile.toPath) + None + } else { + Some(newLeaderEpochFileCache()) + } + } + + /** + * Swap one or more new segment in place and delete one or more existing segments in a crash-safe manner. The old + * segments will be asynchronously deleted. + * + * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded + * or the caller will catch and handle IOException + * + * The sequence of operations is: + * <ol> + * <li> Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). + * If broker crashes at this point, the clean-and-swap operation is aborted and + * the .cleaned files are deleted on recovery in loadSegments(). + * <li> New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the + * clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in + * loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from + * .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files + * whose offset is greater than the minimum-offset .clean file are deleted. + * <li> If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap + * operation is resumed on recovery as described in the next step. + * <li> Old segment files are renamed to .deleted and asynchronous delete is scheduled. + * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). + * replaceSegments() is then invoked to complete the swap with newSegment recreated from + * the .swap file and oldSegments containing segments which were not renamed before the crash. + * <li> Swap segment(s) are renamed to replace the existing segments, completing this operation. + * If the broker crashes, any .deleted files which may be left behind are deleted + * on recovery in loadSegments(). + * </ol> + * + * @param existingSegments The existing segments of the log + * @param newSegments The new log segment to add to the log + * @param oldSegments The old log segments to delete from the log + * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash + */ + private[log] def replaceSegments(existingSegments: LogSegments, Review comment: Document the params ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2566,10 +1979,18 @@ object Log { logDirFailureChannel: LogDirFailureChannel, lastShutdownClean: Boolean = true, keepPartitionMetadataFile: Boolean = true): Log = { + // create the log directory if it doesn't exist + Files.createDirectories(dir.toPath) val topicPartition = Log.parseTopicPartitionName(dir) + val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, + config.messageFormatVersion.recordVersion) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) - new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, - producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel, lastShutdownClean, keepPartitionMetadataFile) + val logLoader = new LogLoader(dir, config, logStartOffset, recoveryPoint, scheduler, time, maxProducerIdExpirationMs, Review comment: Should the LogLoader also instantiate leaderEpochCache and producerStateManager? ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2763,6 +2191,320 @@ object Log { appendInfo.append(batch, firstOffsetMetadata) } + def maybeCreateLeaderEpochCache(dir: File, + topicPartition: TopicPartition, + logDirFailureChannel: LogDirFailureChannel, + recordVersion: RecordVersion): Option[LeaderEpochFileCache] = { + val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) + + def newLeaderEpochFileCache(): LeaderEpochFileCache = { + val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) + new LeaderEpochFileCache(topicPartition, checkpointFile) + } + + if (recordVersion.precedes(RecordVersion.V2)) { + val currentCache = if (leaderEpochFile.exists()) + Some(newLeaderEpochFileCache()) + else + None + + if (currentCache.exists(_.nonEmpty)) + warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") + + Files.deleteIfExists(leaderEpochFile.toPath) + None + } else { + Some(newLeaderEpochFileCache()) + } + } + + /** + * Swap one or more new segment in place and delete one or more existing segments in a crash-safe manner. The old + * segments will be asynchronously deleted. + * + * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded + * or the caller will catch and handle IOException + * + * The sequence of operations is: + * <ol> + * <li> Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). + * If broker crashes at this point, the clean-and-swap operation is aborted and + * the .cleaned files are deleted on recovery in loadSegments(). + * <li> New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the + * clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in + * loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from + * .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files + * whose offset is greater than the minimum-offset .clean file are deleted. + * <li> If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap + * operation is resumed on recovery as described in the next step. + * <li> Old segment files are renamed to .deleted and asynchronous delete is scheduled. + * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). + * replaceSegments() is then invoked to complete the swap with newSegment recreated from + * the .swap file and oldSegments containing segments which were not renamed before the crash. + * <li> Swap segment(s) are renamed to replace the existing segments, completing this operation. + * If the broker crashes, any .deleted files which may be left behind are deleted + * on recovery in loadSegments(). + * </ol> + * + * @param existingSegments The existing segments of the log + * @param newSegments The new log segment to add to the log + * @param oldSegments The old log segments to delete from the log + * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash + */ + private[log] def replaceSegments(existingSegments: LogSegments, + newSegments: Seq[LogSegment], + oldSegments: Seq[LogSegment], + isRecoveredSwapFile: Boolean = false, + dir: File, + topicPartition: TopicPartition, + config: LogConfig, + scheduler: Scheduler, + logDirFailureChannel: LogDirFailureChannel, + producerStateManager: ProducerStateManager): Unit = { + val sortedNewSegments = newSegments.sortBy(_.baseOffset) + // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments + // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment() + // multiple times for the same segment. + val sortedOldSegments = oldSegments.filter(seg => existingSegments.contains(seg.baseOffset)).sortBy(_.baseOffset) + + // need to do this in two phases to be crash safe AND do the delete asynchronously + // if we crash in the middle of this we complete the swap in loadSegments() + if (!isRecoveredSwapFile) + sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) + sortedNewSegments.reverse.foreach(existingSegments.add(_)) + val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet + + // delete the old files + sortedOldSegments.foreach { seg => + // remove the index entry + if (seg.baseOffset != sortedNewSegments.head.baseOffset) + existingSegments.remove(seg.baseOffset) + // delete segment files, but do not delete producer state for segment objects which are being replaced. + deleteSegmentFiles( + List(seg), + asyncDelete = true, + deleteProducerStateSnapshots = !newSegmentBaseOffsets.contains(seg.baseOffset), + dir, + topicPartition, + config, + scheduler, + logDirFailureChannel, + producerStateManager) + } + // okay we are safe now, remove the swap suffix + sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + } + + /** + * Perform physical deletion for the given file. Allows the file to be deleted asynchronously or synchronously. + * + * This method assumes that the file exists and the method is not thread-safe. + * + * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because + * it is either called before all logs are loaded or the caller will catch and handle IOException + * + * @throws IOException if the file can't be renamed and still exists + */ + private[log] def deleteSegmentFiles(segments: Iterable[LogSegment], Review comment: Document the params -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org