[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14075671#comment-14075671
 ] 

Jun Rao commented on KAFKA-1414:
--------------------------------

50. Got it. I was thinking that if for some reason, one of the two values is 
defined, but the other is not, perhaps it's better to error out instead of 
silently let it go.

> Speedup broker startup after hard reset
> ---------------------------------------
>
>                 Key: KAFKA-1414
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1414
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>    Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
>            Reporter: Dmitry Bugaychenko
>            Assignee: Jay Kreps
>             Fix For: 0.8.2
>
>         Attachments: 
> 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
> KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
> KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
> KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
> parallel-dir-loading-trunk-fixed-threadpool.patch, 
> parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch
>
>
> After hard reset due to power failure broker takes way too much time 
> recovering unflushed segments in a single thread. This could be easiliy 
> improved launching multiple threads (one per data dirrectory, assuming that 
> typically each data directory is on a dedicated drive). Localy we trie this 
> simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
> to scala, so do not take it literally:
> {code}
>   /**
>    * Recover and load all logs in the given data directories
>    */
>   private def loadLogs(dirs: Seq[File]) {
>     val threads : Array[Thread] = new Array[Thread](dirs.size)
>     var i: Int = 0
>     val me = this
>     for(dir <- dirs) {
>       val thread = new Thread( new Runnable {
>         def run()
>         {
>           val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>           /* load the logs */
>           val subDirs = dir.listFiles()
>           if(subDirs != null) {
>             val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>             if(cleanShutDownFile.exists())
>               info("Found clean shutdown file. Skipping recovery for all logs 
> in data directory '%s'".format(dir.getAbsolutePath))
>             for(dir <- subDirs) {
>               if(dir.isDirectory) {
>                 info("Loading log '" + dir.getName + "'")
>                 val topicPartition = Log.parseTopicPartitionName(dir.getName)
>                 val config = topicConfigs.getOrElse(topicPartition.topic, 
> defaultConfig)
>                 val log = new Log(dir,
>                   config,
>                   recoveryPoints.getOrElse(topicPartition, 0L),
>                   scheduler,
>                   time)
>                 val previous = addLogWithLock(topicPartition, log)
>                 if(previous != null)
>                   throw new IllegalArgumentException("Duplicate log 
> directories found: %s, %s!".format(log.dir.getAbsolutePath, 
> previous.dir.getAbsolutePath))
>               }
>             }
>             cleanShutDownFile.delete()
>           }
>         }
>       })
>       thread.start()
>       threads(i) = thread
>       i = i + 1
>     }
>     for(thread <- threads) {
>       thread.join()
>     }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
>     logCreationOrDeletionLock synchronized {
>       this.logs.put(topicPartition, log)
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to