[ https://issues.apache.org/jira/browse/KAFKA-695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao updated KAFKA-695: -------------------------- Attachment: kafka-695.patch I have a theory of what's happening here. What we overlooked is that there is another possibility for us to get a closed channel, other than explicitly closing it. If a thread is in the middle of a read/write of a file channel and the thread is interrupted. The channel will be closed automatically. I guess the following is what has happened. The ExpiredRequestReaper thread is in the middle of expiring a FetchRequest and gets interrupted by ExpiredRequestReaper.forcePurge(). When the interruption occurs, the reaper thread could be reading the file channel (see stracktrace below). This will cause the file channel to be closed. All subsequent reads and writes on this file channel will fail due to ClosedChannelException. java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613) at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:83) at kafka.log.LogSegment.translateOffset(LogSegment.scala:76) at kafka.log.LogSegment.read(LogSegment.scala:91) at kafka.log.Log.read(Log.scala:390) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:528) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:510) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222) at java.lang.Thread.run(Thread.java:619) Attach a quick fix. The problem is that we shouldn't be using interrupt to communicate with the ExpiredRequestReaper thread since it has dangerous side effects. The patch basically uses a boolean flag to indicate that a full purge is needed and changes the ExpiredRequestReaper thread not to block for more than 500ms (so that it gets a chance to do the full purge). Not sure what's the best way to unit test this though. > Broker shuts down due to attempt to read a closed index file > ------------------------------------------------------------ > > Key: KAFKA-695 > URL: https://issues.apache.org/jira/browse/KAFKA-695 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.8 > Reporter: Neha Narkhede > Assignee: Jun Rao > Priority: Blocker > Labels: p1 > Attachments: kafka-695.patch > > > Broker shuts down with the following error message - > 013/01/11 01:43:51.320 ERROR [KafkaApis] [request-expiration-task] [kafka] [] > [KafkaApi-277] error when processing request > (service_metrics,2,39192,2000000) > java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) > at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613) > at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:82) > at kafka.log.LogSegment.translateOffset(LogSegment.scala:76) > at kafka.log.LogSegment.read(LogSegment.scala:106) > at kafka.log.Log.read(Log.scala:386) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.map(Map.scala:93) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323) > at > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:519) > at > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:501) > at > kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222) > at java.lang.Thread.run(Thread.java:619) > 2013/01/11 01:43:52.815 INFO [Processor] [kafka-processor-10251-2] [kafka] [] > Closing socket connection to /172.20.72.244. > 2013/01/11 01:43:54.286 INFO [Processor] [kafka-processor-10251-3] [kafka] [] > Closing socket connection to /172.20.72.243. > 2013/01/11 01:43:54.385 ERROR [LogManager] [kafka-logflusher-1] [kafka] [] > [Log Manager on Broker 277] Error flushing topic service_metrics > java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) > at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153) > at kafka.log.LogSegment.flush(LogSegment.scala:151) > at kafka.log.Log.flush(Log.scala:493) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495) > at > kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310) > at > kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144) > at kafka.utils.Utils$$anon$2.run(Utils.scala:66) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:619) > 2013/01/11 01:43:54.447 FATAL [LogManager] [kafka-logflusher-1] [kafka] [] > [Log Manager on Broker 277] Halting due to unrecoverable I/O error while > flushing logs: null > java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) > at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at > kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153) > at kafka.log.LogSegment.flush(LogSegment.scala:151) > at kafka.log.Log.flush(Log.scala:493) > at kafka.log.LogSegment.flush(LogSegment.scala:151) > at kafka.log.Log.flush(Log.scala:493) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319) > at > kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495) > at > kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310) > at > kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144) > at kafka.utils.Utils$$anon$2.run(Utils.scala:66) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:619) > 2013/01/11 01:43:54.512 INFO [ComponentsContextLoaderListener] [Thread-2] > [kafka] [] Shutting down... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira