[ https://issues.apache.org/jira/browse/KAFKA-19571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ilyas Toumlilt updated KAFKA-19571: ----------------------------------- Description: h1. Context We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with multiple disks in a JBOD setup, routine intra-broker data rebalancing is performed using Cruise Control to manage disk utilization. During these rebalance operations, a race condition between a log segment flush operation and the file deletion that is part of the replica's directory move. This race leads to a `NoSuchFileException` when the flush operation targets a file path that has just been deleted by the rebalance process. This exception incorrectly forces the broker to take the entire log directory offline. h1. Logs / Stack trace {code:java} 2025-07-23 19:03:30,114 WARN Failed to flush file /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka. common.utils.Utils) java.nio.file.NoSuchFileException: /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) at java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182) at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292) at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345) at org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029) at kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766) at kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679) at java.base/java.util.Optional.ifPresent(Optional.java:183) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir /var/lib/kafka-08 with offset 24420850595 (exclusi ve) and recovery point 24420850595 (org.apache.kafka.storage.internals.log.LogDirFailureChannel) java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) at com.yammer.metrics.core.Timer.time(Timer.java:91) at org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LocalLog.flush(LocalLog.scala:176) at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task 'flush-log' (org.apache.kafka.server.util.KafkaScheduler) org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log for topic_01-12 in dir /var/lib/kafka-08 with off set 24420850595 (exclusive) and recovery point 24420850595 Caused by: java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) at com.yammer.metrics.core.Timer.time(Timer.java:91) at org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LocalLog.flush(LocalLog.scala:176) at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code} Stack Trace Analysis The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries to flush a producer state snapshot that was moved during a disk rebalance; this {{`NoSuchFileException`}} is anticipated and handled gracefully by the code. As implemented in https://issues.apache.org/jira/browse/KAFKA-13403 to swallow the exception. However, the same task then attempts to flush the actual log segment, which fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because the file handles were invalidated by the directory's move. This unhandled I/O error propagates up and terminates the background task, causing the `{{{}KafkaScheduler{}}}` to log it as an uncaught {{{}`{}}}{{{}KafkaStorageException`{}}}. As a direct consequence, the `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its safety mechanism, taking the entire log directory offline to prevent potential data corruption. h1. Expected Behavior A {{`NoSuchFileException`}} in this context should not cause the entire log directory to be marked as offline. h1. Workaround The current workaround is to manually restart the affected Kafka broker. The restart clears the in-memory state, and upon re-scanning the log directories, the broker marks the disk as healthy again. h1. Proposed fix The proposed solution is to address the race condition at the lowest possible level: the *{{LogSegment.flush()}}* method. The goal is to catch the {{ClosedChannelException}} that occurs during the race and intelligently differentiate it from a legitimate I/O error. The fix should be implemented within the {{catch}} block for {{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as follows: # When a {{ClosedChannelException}} is caught, perform a check to see if the underlying log segment file still exists ({{{}log.file().exists(){}}}). # {*}If the file does not exist{*}, it confirms our race condition hypothesis: the replica has been moved or deleted by a rebalance operation. The exception is benign and should be ignored, with a {{WARN}} message logged for visibility. # {*}If the file does still exist{*}, the {{ClosedChannelException}} is unexpected and could signal a real hardware or filesystem issue. In this case, the exception should be re-thrown to trigger Kafka's standard log directory failure-handling mechanism. This targeted fix would resolve the bug by gracefully handling the known race condition without masking other potentially critical storage errors. h2. Related issues * https://issues.apache.org/jira/browse/KAFKA-13403 was fixed to swallow the first `{{{}NoSuchFileException{}}}` WARN in the above stacktrace, but not the underlying exception. * https://issues.apache.org/jira/browse/KAFKA-15391 is similar but different, it swallows `NoSuchFileException` for race condition on log directory move/delete, but not on the segment file level. was: h1. Context We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with multiple disks in a JBOD setup, routine intra-broker data rebalancing is performed using Cruise Control to manage disk utilization. During these rebalance operations, a race condition between a log segment flush operation and the file deletion that is part of the replica's directory move. This race leads to a `NoSuchFileException` when the flush operation targets a file path that has just been deleted by the rebalance process. This exception incorrectly forces the broker to take the entire log directory offline. h1. Logs / Stack trace {code:java} 2025-07-23 19:03:30,114 WARN Failed to flush file /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka. common.utils.Utils) java.nio.file.NoSuchFileException: /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) at java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182) at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292) at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345) at org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029) at kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766) at kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679) at java.base/java.util.Optional.ifPresent(Optional.java:183) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir /var/lib/kafka-08 with offset 24420850595 (exclusi ve) and recovery point 24420850595 (org.apache.kafka.storage.internals.log.LogDirFailureChannel) java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) at com.yammer.metrics.core.Timer.time(Timer.java:91) at org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LocalLog.flush(LocalLog.scala:176) at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task 'flush-log' (org.apache.kafka.server.util.KafkaScheduler) org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log for topic_01-12 in dir /var/lib/kafka-08 with off set 24420850595 (exclusive) and recovery point 24420850595 Caused by: java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) at org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) at com.yammer.metrics.core.Timer.time(Timer.java:91) at org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at kafka.log.LocalLog.flush(LocalLog.scala:176) at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code} Stack Trace Analysis The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries to flush a producer state snapshot that was moved during a disk rebalance; this {{`NoSuchFileException`}} is anticipated and handled gracefully by the code. However, the same task then attempts to flush the actual log segment, which fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because the file handles were invalidated by the directory's move. This unhandled I/O error propagates up and terminates the background task, causing the `{{{}KafkaScheduler{}}}` to log it as an uncaught {{`}}{{{}KafkaStorageException`{}}}. As a direct consequence, the `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its safety mechanism, taking the entire log directory offline to prevent potential data corruption. h1. Expected Behavior A {{`NoSuchFileException`}} in this context should not cause the entire log directory to be marked as offline. h1. Workaround The current workaround is to manually restart the affected Kafka broker. The restart clears the in-memory state, and upon re-scanning the log directories, the broker marks the disk as healthy again. h1. Proposed fix The proposed solution is to address the race condition at the lowest possible level: the *{{LogSegment.flush()}}* method. The goal is to catch the {{ClosedChannelException}} that occurs during the race and intelligently differentiate it from a legitimate I/O error. The fix should be implemented within the {{catch}} block for {{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as follows: # When a {{ClosedChannelException}} is caught, perform a check to see if the underlying log segment file still exists ({{{}log.file().exists(){}}}). # {*}If the file does not exist{*}, it confirms our race condition hypothesis: the replica has been moved or deleted by a rebalance operation. The exception is benign and should be ignored, with a {{WARN}} message logged for visibility. # {*}If the file does still exist{*}, the {{ClosedChannelException}} is unexpected and could signal a real hardware or filesystem issue. In this case, the exception should be re-thrown to trigger Kafka's standard log directory failure-handling mechanism. This targeted fix would resolve the bug by gracefully handling the known race condition without masking other potentially critical storage errors. > Race condition between log segment flush and file deletion causing log dir to > go offline > ---------------------------------------------------------------------------------------- > > Key: KAFKA-19571 > URL: https://issues.apache.org/jira/browse/KAFKA-19571 > Project: Kafka > Issue Type: Bug > Components: core, log > Affects Versions: 3.7.1 > Reporter: Ilyas Toumlilt > Priority: Major > > h1. Context > We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with > multiple disks in a JBOD setup, routine intra-broker data rebalancing is > performed using Cruise Control to manage disk utilization. During these > rebalance operations, a race condition between a log segment flush operation > and the file deletion that is part of the replica's directory move. This race > leads to a `NoSuchFileException` when the flush operation targets a file path > that has just been deleted by the rebalance process. This exception > incorrectly forces the broker to take the entire log directory offline. > h1. Logs / Stack trace > {code:java} > 2025-07-23 19:03:30,114 WARN Failed to flush file > /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka. > common.utils.Utils) > java.nio.file.NoSuchFileException: > /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > at > java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182) > at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292) > at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345) > at > org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029) > at > kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766) > at > kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915) > at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679) > at java.base/java.util.Optional.ifPresent(Optional.java:183) > at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679) > at > org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir > /var/lib/kafka-08 with offset 24420850595 (exclusi > ve) and recovery point 24420850595 > (org.apache.kafka.storage.internals.log.LogDirFailureChannel) > java.nio.channels.ClosedChannelException > at > java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) > at > java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) > at > org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) > at com.yammer.metrics.core.Timer.time(Timer.java:91) > at > org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) > at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at kafka.log.LocalLog.flush(LocalLog.scala:176) > at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) > at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) > at > kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) > at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) > at > org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task > 'flush-log' (org.apache.kafka.server.util.KafkaScheduler) > org.apache.kafka.common.errors.KafkaStorageException: Error while flushing > log for topic_01-12 in dir /var/lib/kafka-08 with off > set 24420850595 (exclusive) and recovery point 24420850595 > Caused by: java.nio.channels.ClosedChannelException > at > java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) > at > java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452) > at > org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631) > at > org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627) > at com.yammer.metrics.core.Timer.time(Timer.java:91) > at > org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627) > at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at kafka.log.LocalLog.flush(LocalLog.scala:176) > at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719) > at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915) > at > kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700) > at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680) > at > org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving > replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code} > Stack Trace Analysis > The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries > to flush a producer state snapshot that was moved during a disk rebalance; > this {{`NoSuchFileException`}} is anticipated and handled gracefully by the > code. As implemented in https://issues.apache.org/jira/browse/KAFKA-13403 to > swallow the exception. > However, the same task then attempts to flush the actual log segment, which > fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because > the file handles were invalidated by the directory's move. This unhandled I/O > error propagates up and terminates the background task, causing the > `{{{}KafkaScheduler{}}}` to log it as an uncaught > {{{}`{}}}{{{}KafkaStorageException`{}}}. As a direct consequence, the > `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its > safety mechanism, taking the entire log directory offline to prevent > potential data corruption. > h1. Expected Behavior > A {{`NoSuchFileException`}} in this context should not cause the entire log > directory to be marked as offline. > h1. Workaround > The current workaround is to manually restart the affected Kafka broker. The > restart clears the in-memory state, and upon re-scanning the log directories, > the broker marks the disk as healthy again. > h1. Proposed fix > The proposed solution is to address the race condition at the lowest possible > level: the *{{LogSegment.flush()}}* method. The goal is to catch the > {{ClosedChannelException}} that occurs during the race and intelligently > differentiate it from a legitimate I/O error. > The fix should be implemented within the {{catch}} block for > {{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as > follows: > # When a {{ClosedChannelException}} is caught, perform a check to see if the > underlying log segment file still exists ({{{}log.file().exists(){}}}). > # {*}If the file does not exist{*}, it confirms our race condition > hypothesis: the replica has been moved or deleted by a rebalance operation. > The exception is benign and should be ignored, with a {{WARN}} message logged > for visibility. > # {*}If the file does still exist{*}, the {{ClosedChannelException}} is > unexpected and could signal a real hardware or filesystem issue. In this > case, the exception should be re-thrown to trigger Kafka's standard log > directory failure-handling mechanism. > This targeted fix would resolve the bug by gracefully handling the known race > condition without masking other potentially critical storage errors. > h2. Related issues > * https://issues.apache.org/jira/browse/KAFKA-13403 was fixed to swallow the > first `{{{}NoSuchFileException{}}}` WARN in the above stacktrace, but not the > underlying exception. > * https://issues.apache.org/jira/browse/KAFKA-15391 is similar but > different, it swallows `NoSuchFileException` for race condition on log > directory move/delete, but not on the segment file level. -- This message was sent by Atlassian Jira (v8.20.10#820010)