[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14066477#comment-14066477 ]
Jun Rao commented on KAFKA-1414: -------------------------------- Thanks for patch v4. Some more comments. 40. LogManager: 40.1. The logic in loadLogs is still not quite right. Even if the shutdown is clean, we will still need to load the log. The initialization logic in Log knows whether to perform recovery or not. Also, we can't initialize dirLogs from logsByDir since it's empty during startup. So, we have to get it from dir.listFiles(). 40.2 The following logging in both loadLogs() and shutdown() is not quite right. error( "There was an error in one of the threads during logs loading: {}" .format(e.getCause)) This is the slf4j style used in the new clients. The server side still uses log4j. So, it should be error( "There was an error in one of the threads during logs loading,", e.getCause) 40.3 Would it be clearer to name dirLogs as logsPerDir and dirJobs as jobsPerDir? 40.4 remove unused import ExecutorService 41. server.properties: 41.1 typo "shuch" 41.2 Also, would the following description be better? # The number of threads to be used when performing io intensive operations such as # log recovery and log flushing during startup and shutdown. 42. Your patch for Uitls.runnable looks good. Could you include it in the next patch? > Speedup broker startup after hard reset > --------------------------------------- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log > Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 > Reporter: Dmitry Bugaychenko > Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > freebie.patch, parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** > * Recover and load all logs in the given data directories > */ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)