[
https://issues.apache.org/jira/browse/KAFKA-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhanxiang (Patrick) Huang reassigned KAFKA-7452:
------------------------------------------------
Assignee: Zhanxiang (Patrick) Huang
> Deleting snapshot files after check-pointing log recovery offsets can slow
> down replication when truncation happens
> -------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-7452
> URL: https://issues.apache.org/jira/browse/KAFKA-7452
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 1.0.0, 1.0.1, 1.1.0, 1.1.1, 2.0.0
> Reporter: Zhanxiang (Patrick) Huang
> Assignee: Zhanxiang (Patrick) Huang
> Priority: Major
>
> After KAFKA-5829, Kafka will try to iterate through all the partition dirs to
> delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir".
> Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following
> places:
> # Truncation
> # Log dir deletion and movement
> # Background thread checkpointing recovery offsets
> In 2.0 deployment on a cluster with 10k partitions per broker, we found out
> that deleting useless snapshot files in the critical path of log truncation
> can significantly slow down followers to catch up with leader during rolling
> bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R"
> for the whole data directory only to potentially delete the snapshot files in
> one partition directory because the way we identify snapshot files is to list
> the directories and check the filename suffix.
> In our case, "listSnapshotFiles" takes ~1ms per partition directory so it
> takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after
> the truncation, which delays future fetches in the fetcher thread.
> Here are the related code snippets:
> LogManager.scala
>
> {code:java}
> private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
> for {
> partitionToLog <- logsByDir.get(dir.getAbsolutePath)
> checkpoint <- recoveryPointCheckpoints.get(dir)
> } {
> try {
> checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
> allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
> } catch {
> case e: IOException =>
> logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath,
> s"Disk error while writing to recovery point " +
> s"file in directory $dir", e)
> }
> }
> }
> {code}
>
> ProducerStateChangeManager.scala
>
> {code:java}
> private[log] def listSnapshotFiles(dir: File): Seq[File] = {
> if (dir.exists && dir.isDirectory) {
> Option(dir.listFiles).map { files =>
> files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
> }.getOrElse(Seq.empty)
> } else Seq.empty
> }
> private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ =>
> true) {
> listSnapshotFiles(dir).filter(file =>
> predicate(offsetFromFile(file))).foreach { file =>
> Files.deleteIfExists(file.toPath)
> }
> }
> {code}
>
> There are a few things that can be optimized here:
> # We can have an in-memory cache for the snapshot files metadata (filename)
> in ProducerStateManager to avoid calling dir.listFiles in
> "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile".
> # After truncation, we can only try to delete snapshot files for the
> truncated partitions (in replica fetcher thread, we truncate one partition at
> a time) instead of all partitions. Or maybe we don't even need to delete
> snapshot files in the critical path of truncation because the background
> log-recovery-offset-checkpoint-thread will do it periodically. This can also
> apply on log deletion/movement.
> # If we want to further optimize the actual snapshot file deletion, we can
> make it async. But I am not sure whether it is needed after we have 1) and 2).
> Also, we notice that there is no way to disable transaction/exactly-once
> support in the broker-side given that it will bring in some extra overhead
> even though we have no clients using this feature. Not sure whether this is a
> common use case, but it is useful if we can have a switch to avoid the extra
> performance overhead.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)