[ 
https://issues.apache.org/jira/browse/KAFKA-19571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18012531#comment-18012531
 ] 

Ilyas Toumlilt commented on KAFKA-19571:
----------------------------------------

(https://issues.apache.org/jira/browse/KAFKA-13403) was fixed to swallow the 
first ``NoSuchFileException`` WARN in the above stacktrace, but not the 
underlying exception.

> Race condition between log segment flush and file deletion causing log dir to 
> go offline
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19571
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19571
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, log
>    Affects Versions: 3.7.1
>            Reporter: Ilyas Toumlilt
>            Priority: Major
>
> h1. Context
> We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with 
> multiple disks in a JBOD setup, routine intra-broker data rebalancing is 
> performed using Cruise Control to manage disk utilization. During these 
> rebalance operations, a race condition between a log segment flush operation 
> and the file deletion that is part of the replica's directory move. This race 
> leads to a `NoSuchFileException` when the flush operation targets a file path 
> that has just been deleted by the rebalance process. This exception 
> incorrectly forces the broker to take the entire log directory offline.
> h1. Logs / Stack trace
> {code:java}
> 2025-07-23 19:03:30,114 WARN Failed to flush file 
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka.
> common.utils.Utils)
> java.nio.file.NoSuchFileException: 
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot
>         at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
>         at 
> java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
>         at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
>         at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
>         at 
> org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029)
>         at 
> kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766)
>         at 
> kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915)
>         at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679)
>         at java.base/java.util.Optional.ifPresent(Optional.java:183)
>         at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679)
>         at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>         
> 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir 
> /var/lib/kafka-08 with offset 24420850595 (exclusi
> ve) and recovery point 24420850595 
> (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at 
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>         at 
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
>         at com.yammer.metrics.core.Timer.time(Timer.java:91)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
>         at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
>         at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>         at kafka.log.LocalLog.flush(LocalLog.scala:176)
>         at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
>         at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
>         at 
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
>         at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
>         at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>         
> 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task 
> 'flush-log' (org.apache.kafka.server.util.KafkaScheduler)
> org.apache.kafka.common.errors.KafkaStorageException: Error while flushing 
> log for topic_01-12 in dir /var/lib/kafka-08 with off
> set 24420850595 (exclusive) and recovery point 24420850595
> Caused by: java.nio.channels.ClosedChannelException
>         at 
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>         at 
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
>         at com.yammer.metrics.core.Timer.time(Timer.java:91)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
>         at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
>         at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>         at kafka.log.LocalLog.flush(LocalLog.scala:176)
>         at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
>         at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
>         at 
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
>         at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
>         at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>         
> 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving 
> replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code}
> Stack Trace Analysis
> The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries 
> to flush a producer state snapshot that was moved during a disk rebalance; 
> this {{`NoSuchFileException`}} is anticipated and handled gracefully by the 
> code. As implemented in https://issues.apache.org/jira/browse/KAFKA-13403 to 
> swallow the exception. 
> However, the same task then attempts to flush the actual log segment, which 
> fails with a critical, unhandled  `{{{}ClosedChannelException{}}}` because 
> the file handles were invalidated by the directory's move. This unhandled I/O 
> error propagates up and terminates the background task, causing the 
> `{{{}KafkaScheduler{}}}` to log it as an uncaught 
> {{{}`{}}}{{{}KafkaStorageException`{}}}. As a direct consequence, the 
> `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its 
> safety mechanism, taking the entire log directory offline to prevent 
> potential data corruption.
> h1. Expected Behavior
> A {{`NoSuchFileException`}} in this context should not cause the entire log 
> directory to be marked as offline.
> h1. Workaround
> The current workaround is to manually restart the affected Kafka broker. The 
> restart clears the in-memory state, and upon re-scanning the log directories, 
> the broker marks the disk as healthy again.
> h1. Proposed fix
> The proposed solution is to address the race condition at the lowest possible 
> level: the *{{LogSegment.flush()}}* method. The goal is to catch the 
> {{ClosedChannelException}} that occurs during the race and intelligently 
> differentiate it from a legitimate I/O error.
> The fix should be implemented within the {{catch}} block for 
> {{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as 
> follows:
>  # When a {{ClosedChannelException}} is caught, perform a check to see if the 
> underlying log segment file still exists ({{{}log.file().exists(){}}}).
>  # {*}If the file does not exist{*}, it confirms our race condition 
> hypothesis: the replica has been moved or deleted by a rebalance operation. 
> The exception is benign and should be ignored, with a {{WARN}} message logged 
> for visibility.
>  # {*}If the file does still exist{*}, the {{ClosedChannelException}} is 
> unexpected and could signal a real hardware or filesystem issue. In this 
> case, the exception should be re-thrown to trigger Kafka's standard log 
> directory failure-handling mechanism.
> This targeted fix would resolve the bug by gracefully handling the known race 
> condition without masking other potentially critical storage errors.
> h2. Related issues
>  * https://issues.apache.org/jira/browse/KAFKA-13403 was fixed to swallow the 
> first `{{{}NoSuchFileException{}}}` WARN in the above stacktrace, but not the 
> underlying exception.
>  * https://issues.apache.org/jira/browse/KAFKA-15391 is similar but 
> different, it swallows `NoSuchFileException` for race condition on log 
> directory move/delete, but not on the segment file level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to