[ https://issues.apache.org/jira/browse/KAFKA-19571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18012532#comment-18012532 ]
Ilyas Toumlilt commented on KAFKA-19571: ---------------------------------------- (https://issues.apache.org/jira/browse/KAFKA-15391) is similar but different, it swallows `NoSuchFileException` for race condition on log directory move/delete, but not on the segment file level. > Race condition between log segment flush and file deletion causing log dir to > go offline > ---------------------------------------------------------------------------------------- > > Key: KAFKA-19571 > URL: https://issues.apache.org/jira/browse/KAFKA-19571 > Project: Kafka > Issue Type: Bug > Components: core, log > Affects Versions: 3.7.1 > Reporter: Ilyas Toumlilt > Priority: Major > > h1. Context > We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with > multiple disks in a JBOD setup, routine intra-broker data rebalancing is > performed using Cruise Control to manage disk utilization. During these > rebalance operations, a race condition between a log segment flush operation > and the file deletion that is part of the replica's directory move. This race > leads to a `NoSuchFileException` when the flush operation targets a file path > that has just been deleted by the rebalance process. This exception > incorrectly forces the broker to take the entire log directory offline. > h1. Logs / Stack trace > {code:java} > 2025-07-23 19:03:30,114 WARN Failed to flush file > /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka. > common.utils.Utils) > java.nio.file.NoSuchFileException: > /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > at > java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182) > at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292) > at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345) > at > org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029) > at > kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766) > at > kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915) > at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679) > at java.base/java.util.Optional.ifPresent(Optional.java:183) > at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679) > at > org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir > /var/lib/kafka-08 with offset 24420850595 (exclusi > ve) and recovery point 24420850595 > (org.apache.kafka.storage.internals.log.LogDirFailureChannel) > java.nio.channels.ClosedChannelException > at > java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) > at > java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) > at > org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) > at com.yammer.metrics.core.Timer.time(Timer.java:91) > at > org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) > at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at kafka.log.LocalLog.flush(LocalLog.scala:176) > at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) > at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) > at > kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) > at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) > at > org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task > 'flush-log' (org.apache.kafka.server.util.KafkaScheduler) > org.apache.kafka.common.errors.KafkaStorageException: Error while flushing > log for topic_01-12 in dir /var/lib/kafka-08 with off > set 24420850595 (exclusive) and recovery point 24420850595 > Caused by: java.nio.channels.ClosedChannelException > at > java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) > at > java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) > at > org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) > at com.yammer.metrics.core.Timer.time(Timer.java:91) > at > org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) > at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at kafka.log.LocalLog.flush(LocalLog.scala:176) > at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) > at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) > at > kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) > at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) > at > org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving > replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code} > Stack Trace Analysis > The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries > to flush a producer state snapshot that was moved during a disk rebalance; > this {{`NoSuchFileException`}} is anticipated and handled gracefully by the > code. As implemented in https://issues.apache.org/jira/browse/KAFKA-13403 to > swallow the exception. > However, the same task then attempts to flush the actual log segment, which > fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because > the file handles were invalidated by the directory's move. This unhandled I/O > error propagates up and terminates the background task, causing the > `{{{}KafkaScheduler{}}}` to log it as an uncaught > {{{}`{}}}{{{}KafkaStorageException`{}}}. As a direct consequence, the > `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its > safety mechanism, taking the entire log directory offline to prevent > potential data corruption. > h1. Expected Behavior > A {{`NoSuchFileException`}} in this context should not cause the entire log > directory to be marked as offline. > h1. Workaround > The current workaround is to manually restart the affected Kafka broker. The > restart clears the in-memory state, and upon re-scanning the log directories, > the broker marks the disk as healthy again. > h1. Proposed fix > The proposed solution is to address the race condition at the lowest possible > level: the *{{LogSegment.flush()}}* method. The goal is to catch the > {{ClosedChannelException}} that occurs during the race and intelligently > differentiate it from a legitimate I/O error. > The fix should be implemented within the {{catch}} block for > {{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as > follows: > # When a {{ClosedChannelException}} is caught, perform a check to see if the > underlying log segment file still exists ({{{}log.file().exists(){}}}). > # {*}If the file does not exist{*}, it confirms our race condition > hypothesis: the replica has been moved or deleted by a rebalance operation. > The exception is benign and should be ignored, with a {{WARN}} message logged > for visibility. > # {*}If the file does still exist{*}, the {{ClosedChannelException}} is > unexpected and could signal a real hardware or filesystem issue. In this > case, the exception should be re-thrown to trigger Kafka's standard log > directory failure-handling mechanism. > This targeted fix would resolve the bug by gracefully handling the known race > condition without masking other potentially critical storage errors. > h2. Related issues > * https://issues.apache.org/jira/browse/KAFKA-13403 was fixed to swallow the > first `{{{}NoSuchFileException{}}}` WARN in the above stacktrace, but not the > underlying exception. > * https://issues.apache.org/jira/browse/KAFKA-15391 is similar but > different, it swallows `NoSuchFileException` for race condition on log > directory move/delete, but not on the segment file level. -- This message was sent by Atlassian Jira (v8.20.10#820010)