Ilyas Toumlilt created KAFKA-19571: -------------------------------------- Summary: Race condition between log segment flush and file deletion causing log dir to go offline Key: KAFKA-19571 URL: https://issues.apache.org/jira/browse/KAFKA-19571 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 3.7.1 Reporter: Ilyas Toumlilt
h1. Context We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with multiple disks in a JBOD setup, routine intra-broker data rebalancing is performed using Cruise Control to manage disk utilization. During these rebalance operations, a race condition between a log segment flush operation and the file deletion that is part of the replica's directory move. This race leads to a `NoSuchFileException` when the flush operation targets a file path that has just been deleted by the rebalance process. This exception incorrectly forces the broker to take the entire log directory offline. h1. Logs / Stack trace {code:java} 2025-07-23 19:03:30,114 WARN Failed to flush file /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka. common.utils.Utils) java.nio.file.NoSuchFileException: /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) at java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182) at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292) at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345) at org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029) at kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766) at kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679) at java.base/java.util.Optional.ifPresent(Optional.java:183) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir /var/lib/kafka-08 with offset 24420850595 (exclusi ve) and recovery point 24420850595 (org.apache.kafka.storage.internals.log.LogDirFailureChannel) java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) at com.yammer.metrics.core.Timer.time(Timer.java:91) at org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LocalLog.flush(LocalLog.scala:176) at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task 'flush-log' (org.apache.kafka.server.util.KafkaScheduler) org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log for topic_01-12 in dir /var/lib/kafka-08 with off set 24420850595 (exclusive) and recovery point 24420850595 Caused by: java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) at com.yammer.metrics.core.Timer.time(Timer.java:91) at org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LocalLog.flush(LocalLog.scala:176) at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code} Stack Trace Analysis The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries to flush a producer state snapshot that was moved during a disk rebalance; this {{`NoSuchFileException`}} is anticipated and handled gracefully by the code. However, the same task then attempts to flush the actual log segment, which fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because the file handles were invalidated by the directory's move. This unhandled I/O error propagates up and terminates the background task, causing the `{{{}KafkaScheduler{}}}` to log it as an uncaught {{`}}{{{}KafkaStorageException`{}}}. As a direct consequence, the `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its safety mechanism, taking the entire log directory offline to prevent potential data corruption. h1. Expected Behavior A {{`NoSuchFileException`}} in this context should not cause the entire log directory to be marked as offline. h1. Workaround The current workaround is to manually restart the affected Kafka broker. The restart clears the in-memory state, and upon re-scanning the log directories, the broker marks the disk as healthy again. h1. Proposed fix The proposed solution is to address the race condition at the lowest possible level: the *{{LogSegment.flush()}}* method. The goal is to catch the {{ClosedChannelException}} that occurs during the race and intelligently differentiate it from a legitimate I/O error. The fix should be implemented within the {{catch}} block for {{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as follows: # When a {{ClosedChannelException}} is caught, perform a check to see if the underlying log segment file still exists ({{{}log.file().exists(){}}}). # {*}If the file does not exist{*}, it confirms our race condition hypothesis: the replica has been moved or deleted by a rebalance operation. The exception is benign and should be ignored, with a {{WARN}} message logged for visibility. # {*}If the file does still exist{*}, the {{ClosedChannelException}} is unexpected and could signal a real hardware or filesystem issue. In this case, the exception should be re-thrown to trigger Kafka's standard log directory failure-handling mechanism. This targeted fix would resolve the bug by gracefully handling the known race condition without masking other potentially critical storage errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)