[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13981025#comment-13981025 ]
Dmitry Bugaychenko commented on KAFKA-1414: ------------------------------------------- Actually similar improvement is needed for shutdown to - on a 20-disk with no forced flush during processing it could take more than 10 minutes to shutdown and flush logs in a single thread. We end up with the following workaround: {code} def shutdown() { info("Shutting down.") try { // stop the cleaner first if (cleaner != null) { Utils.swallow(cleaner.shutdown()) } // Span single shutdown thread for each data dir. val threadCounter : AtomicInteger = new AtomicInteger() // TODO: This must be configurable!!!! val semaphore : Semaphore = new Semaphore(8) val threads : ArrayBuffer[Thread] = new ArrayBuffer[Thread]() threads ++= logDirs.map( x => { val thread : Thread = new Thread(new Runnable { override def run(): Unit = { threadCounter.incrementAndGet() semaphore.acquire() try { val parent : String = x.toString val thisDirLogs : Seq[(TopicAndPartition,Log)] = logs.filter(_._2.dir.toString.startsWith(parent)).toSeq // flush the logs to ensure latest possible recovery point info("Flushing logs at " + parent) thisDirLogs.foreach(_._2.flush()) info("Closing logs at " + parent) // close the logs thisDirLogs.foreach(_._2.close()) // update the last flush point info("Updating recovery points " + parent) recoveryPointCheckpoints(x).write( thisDirLogs.groupBy(_._1).mapValues(_.head._2.recoveryPoint)) // mark that the shutdown was clean by creating the clean shutdown marker file info("Writing clean shutdown marker " + parent) Utils.swallow(new File(parent, Log.CleanShutdownFile).createNewFile()) } finally { semaphore.release() threadCounter.decrementAndGet() } } }) thread.start() thread }) // Wait them all threads.foreach(_.join()) // Check that all threads ended if (threadCounter.get() > 0) { error("Not all shut down threads ended.") } } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) } info("Shutdown complete.") } {code} Howevere, there are evidences that running too many threads flushing at shutdown can cause JVM to terminate due to native out of memory: {code} # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 4088 bytes for AllocateHeap JVM exited unexpectedly while stopping the application {code} > Speedup broker startup after hard reset > --------------------------------------- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log > Affects Versions: 0.8.1 > Reporter: Dmitry Bugaychenko > Assignee: Jay Kreps > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** > * Recover and load all logs in the given data directories > */ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)