Ilyas Toumlilt created KAFKA-19571:
--------------------------------------

             Summary: Race condition between log segment flush and file 
deletion causing log dir to go offline
                 Key: KAFKA-19571
                 URL: https://issues.apache.org/jira/browse/KAFKA-19571
             Project: Kafka
          Issue Type: Bug
          Components: core, log
    Affects Versions: 3.7.1
            Reporter: Ilyas Toumlilt


h1. Context

We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with 
multiple disks in a JBOD setup, routine intra-broker data rebalancing is 
performed using Cruise Control to manage disk utilization. During these 
rebalance operations, a race condition between a log segment flush operation 
and the file deletion that is part of the replica's directory move. This race 
leads to a `NoSuchFileException` when the flush operation targets a file path 
that has just been deleted by the rebalance process. This exception incorrectly 
forces the broker to take the entire log directory offline.
h1. Logs / Stack trace
{code:java}
2025-07-23 19:03:30,114 WARN Failed to flush file 
/var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka.
common.utils.Utils)
java.nio.file.NoSuchFileException: 
/var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot
        at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
        at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
        at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
        at 
java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
        at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
        at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
        at 
org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029)
        at 
kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766)
        at 
kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915)
        at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679)
        at java.base/java.util.Optional.ifPresent(Optional.java:183)
        at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679)
        at 
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
        
2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir 
/var/lib/kafka-08 with offset 24420850595 (exclusi
ve) and recovery point 24420850595 
(org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
        at 
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
        at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
        at 
org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
        at 
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
        at 
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
        at com.yammer.metrics.core.Timer.time(Timer.java:91)
        at 
org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
        at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at kafka.log.LocalLog.flush(LocalLog.scala:176)
        at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
        at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
        at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
        at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
        at 
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
        
2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task 'flush-log' 
(org.apache.kafka.server.util.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log 
for topic_01-12 in dir /var/lib/kafka-08 with off
set 24420850595 (exclusive) and recovery point 24420850595
Caused by: java.nio.channels.ClosedChannelException
        at 
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
        at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
        at 
org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
        at 
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
        at 
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
        at com.yammer.metrics.core.Timer.time(Timer.java:91)
        at 
org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
        at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at kafka.log.LocalLog.flush(LocalLog.scala:176)
        at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
        at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
        at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
        at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
        at 
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
        
2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving 
replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code}
Stack Trace Analysis

The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries to 
flush a producer state snapshot that was moved during a disk rebalance; this 
{{`NoSuchFileException`}} is anticipated and handled gracefully by the code. 
However, the same task then attempts to flush the actual log segment, which 
fails with a critical, unhandled  `{{{}ClosedChannelException{}}}` because the 
file handles were invalidated by the directory's move. This unhandled I/O error 
propagates up and terminates the background task, causing the 
`{{{}KafkaScheduler{}}}` to log it as an uncaught 
{{`}}{{{}KafkaStorageException`{}}}. As a direct consequence, the 
`{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its 
safety mechanism, taking the entire log directory offline to prevent potential 
data corruption.
h1. Expected Behavior

A {{`NoSuchFileException`}} in this context should not cause the entire log 
directory to be marked as offline.
h1. Workaround

The current workaround is to manually restart the affected Kafka broker. The 
restart clears the in-memory state, and upon re-scanning the log directories, 
the broker marks the disk as healthy again. 
h1. Proposed fix

The proposed solution is to address the race condition at the lowest possible 
level: the *{{LogSegment.flush()}}* method. The goal is to catch the 
{{ClosedChannelException}} that occurs during the race and intelligently 
differentiate it from a legitimate I/O error.

The fix should be implemented within the {{catch}} block for 
{{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as 
follows:
 # When a {{ClosedChannelException}} is caught, perform a check to see if the 
underlying log segment file still exists ({{{}log.file().exists(){}}}).

 # {*}If the file does not exist{*}, it confirms our race condition hypothesis: 
the replica has been moved or deleted by a rebalance operation. The exception 
is benign and should be ignored, with a {{WARN}} message logged for visibility.

 # {*}If the file does still exist{*}, the {{ClosedChannelException}} is 
unexpected and could signal a real hardware or filesystem issue. In this case, 
the exception should be re-thrown to trigger Kafka's standard log directory 
failure-handling mechanism.

This targeted fix would resolve the bug by gracefully handling the known race 
condition without masking other potentially critical storage errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to