[ https://issues.apache.org/jira/browse/KAFKA-695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13559935#comment-13559935 ]
Jay Kreps commented on KAFKA-695: --------------------------------- Some research: 1. Taking also KAFKA-719 we have examples of this happening in the background flush, the read, and in append(). Flush and append only happen on the active segment so that complicates things. 2. The FileChannel in FileMessageSet is private so the only way to close it is to either call close() or delete() on the message set. 3. Since delete() first calls close() it is hard to say which code path was taken (you would get the same error message). 4. The only call to close() is LogSegment.close() the only call to that is in Log.close() so that's not it. 5. There are several calls to delete() but all are inside the lock except for deleteSegments. 6. deleteSegments should, intuitively, not delete the active segment since we force a segment roll if needed inside the lock inside markDeleteWhile() 7. But it is possible for deleteSegments to collide with truncateTo, but not sure how feasible this is. > Broker shuts down due to attempt to read a closed index file > ------------------------------------------------------------ > > Key: KAFKA-695 > URL: https://issues.apache.org/jira/browse/KAFKA-695 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.8 > Reporter: Neha Narkhede > Priority: Blocker > Labels: p2 > > Broker shuts down with the following error message - > 013/01/11 01:43:51.320 ERROR [KafkaApis] [request-expiration-task] [kafka] [] > [KafkaApi-277] error when processing request > (service_metrics,2,39192,2000000) > java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) > at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613) > at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:82) > at kafka.log.LogSegment.translateOffset(LogSegment.scala:76) > at kafka.log.LogSegment.read(LogSegment.scala:106) > at kafka.log.Log.read(Log.scala:386) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.map(Map.scala:93) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323) > at > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:519) > at > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:501) > at > kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222) > at java.lang.Thread.run(Thread.java:619) > 2013/01/11 01:43:52.815 INFO [Processor] [kafka-processor-10251-2] [kafka] [] > Closing socket connection to /172.20.72.244. > 2013/01/11 01:43:54.286 INFO [Processor] [kafka-processor-10251-3] [kafka] [] > Closing socket connection to /172.20.72.243. > 2013/01/11 01:43:54.385 ERROR [LogManager] [kafka-logflusher-1] [kafka] [] > [Log Manager on Broker 277] Error flushing topic service_metrics > java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) > at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153) > at kafka.log.LogSegment.flush(LogSegment.scala:151) > at kafka.log.Log.flush(Log.scala:493) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495) > at > kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310) > at > kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144) > at kafka.utils.Utils$$anon$2.run(Utils.scala:66) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:619) > 2013/01/11 01:43:54.447 FATAL [LogManager] [kafka-logflusher-1] [kafka] [] > [Log Manager on Broker 277] Halting due to unrecoverable I/O error while > flushing logs: null > java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) > at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153) > at kafka.log.LogSegment.flush(LogSegment.scala:151) > at kafka.log.Log.flush(Log.scala:493) > at kafka.log.LogSegment.flush(LogSegment.scala:151) > at kafka.log.Log.flush(Log.scala:493) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495) > at > kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310) > at > kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144) > at kafka.utils.Utils$$anon$2.run(Utils.scala:66) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:619) > 2013/01/11 01:43:54.512 INFO [ComponentsContextLoaderListener] [Thread-2] > [kafka] [] Shutting down... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira