[ 
https://issues.apache.org/jira/browse/KAFKA-695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13564463#comment-13564463
 ] 

Neha Narkhede commented on KAFKA-695:
-------------------------------------

Great catch! One question just to see if I understood the problem correctly. So 
the problem happens if the forcePurge() interrupts the expired request reaper 
while reading message set from the log. In that case, we should also see "error 
when processing request.." in the kafka log, because we catch all throwables in 
the readMessageSet() API. Do you see that ?

>> The purpose of forcePurge is to try to clean out dead memory?

Yes, this was added as part of fixing KAFKA-664

I think the patch looks good, it is fine to delay the full purge by a few ms.
                
> Broker shuts down due to attempt to read a closed index file
> ------------------------------------------------------------
>
>                 Key: KAFKA-695
>                 URL: https://issues.apache.org/jira/browse/KAFKA-695
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Jun Rao
>            Priority: Blocker
>              Labels: p1
>             Fix For: 0.8
>
>         Attachments: kafka-695.patch
>
>
> Broker shuts down with the following error message -
> 013/01/11 01:43:51.320 ERROR [KafkaApis] [request-expiration-task] [kafka] [] 
>  [KafkaApi-277] error when processing request 
> (service_metrics,2,39192,2000000)
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
>         at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:613)
>         at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:82)
>         at kafka.log.LogSegment.translateOffset(LogSegment.scala:76)
>         at kafka.log.LogSegment.read(LogSegment.scala:106)
>         at kafka.log.Log.read(Log.scala:386)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
>         at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:519)
>         at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:501)
>         at 
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:222)
>         at java.lang.Thread.run(Thread.java:619)
> 2013/01/11 01:43:52.815 INFO [Processor] [kafka-processor-10251-2] [kafka] [] 
>  Closing socket connection to /172.20.72.244.
> 2013/01/11 01:43:54.286 INFO [Processor] [kafka-processor-10251-3] [kafka] [] 
>  Closing socket connection to /172.20.72.243.
> 2013/01/11 01:43:54.385 ERROR [LogManager] [kafka-logflusher-1] [kafka] []  
> [Log Manager on Broker 277] Error flushing topic service_metrics
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349)
>         at 
> kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154)
>         at 
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
>         at 
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153)
>         at kafka.log.LogSegment.flush(LogSegment.scala:151)
>         at kafka.log.Log.flush(Log.scala:493)
>         at 
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319)
>         at 
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>         at 
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
>         at 
> kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310)
>         at 
> kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144)
>         at kafka.utils.Utils$$anon$2.run(Utils.scala:66)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:619)
> 2013/01/11 01:43:54.447 FATAL [LogManager] [kafka-logflusher-1] [kafka] []  
> [Log Manager on Broker 277] Halting due to unrecoverable I/O error while 
> flushing logs: null
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:349)
>         at 
> kafka.log.FileMessageSet$$anonfun$flush$1.apply$mcV$sp(FileMessageSet.scala:154)
>         at 
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
>         at 
> kafka.log.FileMessageSet$$anonfun$flush$1.apply(FileMessageSet.scala:154)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.log.FileMessageSet.flush(FileMessageSet.scala:153)
>         at kafka.log.LogSegment.flush(LogSegment.scala:151)
>         at kafka.log.Log.flush(Log.scala:493)
>        at kafka.log.LogSegment.flush(LogSegment.scala:151)
>         at kafka.log.Log.flush(Log.scala:493)
>         at 
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:319)
>         at 
> kafka.log.LogManager$$anonfun$kafka$log$LogManager$$flushDirtyLogs$2.apply(LogManager.scala:310)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>         at 
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
>         at 
> kafka.log.LogManager.kafka$log$LogManager$$flushDirtyLogs(LogManager.scala:310)
>         at 
> kafka.log.LogManager$$anonfun$startup$2.apply$mcV$sp(LogManager.scala:144)
>         at kafka.utils.Utils$$anon$2.run(Utils.scala:66)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:619)
> 2013/01/11 01:43:54.512 INFO [ComponentsContextLoaderListener] [Thread-2] 
> [kafka] []  Shutting down...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to