[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13981025#comment-13981025
 ] 

Dmitry Bugaychenko commented on KAFKA-1414:
-------------------------------------------

Actually similar improvement is needed for shutdown to - on a 20-disk with no 
forced flush during processing it could take more than 10 minutes to shutdown 
and flush logs in a single thread. We end up with the following workaround:

{code}
  def shutdown() {
    info("Shutting down.")
    try {
      // stop the cleaner first
      if (cleaner != null) {
        Utils.swallow(cleaner.shutdown())
      }

      // Span single shutdown thread for each data dir.
      val threadCounter : AtomicInteger = new AtomicInteger()

      // TODO: This must be configurable!!!!
      val semaphore : Semaphore = new Semaphore(8)

      val threads : ArrayBuffer[Thread] = new ArrayBuffer[Thread]()
      threads ++= logDirs.map(
        x => {
          val thread : Thread = new Thread(new Runnable {
            override def run(): Unit = {
              threadCounter.incrementAndGet()

              semaphore.acquire()
              try {
                val parent : String = x.toString
                val thisDirLogs : Seq[(TopicAndPartition,Log)] = 
logs.filter(_._2.dir.toString.startsWith(parent)).toSeq

                // flush the logs to ensure latest possible recovery point
                info("Flushing logs at " + parent)
                thisDirLogs.foreach(_._2.flush())
                info("Closing logs at " + parent)
                // close the logs
                thisDirLogs.foreach(_._2.close())

                // update the last flush point
                info("Updating recovery points " + parent)
                recoveryPointCheckpoints(x).write(
                  thisDirLogs.groupBy(_._1).mapValues(_.head._2.recoveryPoint))

                // mark that the shutdown was clean by creating the clean 
shutdown marker file
                info("Writing clean shutdown marker " + parent)
                Utils.swallow(new File(parent, 
Log.CleanShutdownFile).createNewFile())
              } finally {
                semaphore.release()
                threadCounter.decrementAndGet()
              }
            }
          })

          thread.start()
          thread
        })

      // Wait them all
      threads.foreach(_.join())

      // Check that all threads ended
      if (threadCounter.get() > 0) {
        error("Not all shut down threads ended.")
      }
    } finally {
      // regardless of whether the close succeeded, we need to unlock the data 
directories
      dirLocks.foreach(_.destroy())
    }
    info("Shutdown complete.")
  }
{code}

Howevere, there are evidences that running too many threads flushing at 
shutdown can cause JVM to terminate due to native out of memory:
{code}
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 4088 bytes for 
AllocateHeap
JVM exited unexpectedly while stopping the application
{code}

> Speedup broker startup after hard reset
> ---------------------------------------
>
>                 Key: KAFKA-1414
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1414
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>    Affects Versions: 0.8.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Jay Kreps
>
> After hard reset due to power failure broker takes way too much time 
> recovering unflushed segments in a single thread. This could be easiliy 
> improved launching multiple threads (one per data dirrectory, assuming that 
> typically each data directory is on a dedicated drive). Localy we trie this 
> simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
> to scala, so do not take it literally:
> {code}
>   /**
>    * Recover and load all logs in the given data directories
>    */
>   private def loadLogs(dirs: Seq[File]) {
>     val threads : Array[Thread] = new Array[Thread](dirs.size)
>     var i: Int = 0
>     val me = this
>     for(dir <- dirs) {
>       val thread = new Thread( new Runnable {
>         def run()
>         {
>           val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>           /* load the logs */
>           val subDirs = dir.listFiles()
>           if(subDirs != null) {
>             val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>             if(cleanShutDownFile.exists())
>               info("Found clean shutdown file. Skipping recovery for all logs 
> in data directory '%s'".format(dir.getAbsolutePath))
>             for(dir <- subDirs) {
>               if(dir.isDirectory) {
>                 info("Loading log '" + dir.getName + "'")
>                 val topicPartition = Log.parseTopicPartitionName(dir.getName)
>                 val config = topicConfigs.getOrElse(topicPartition.topic, 
> defaultConfig)
>                 val log = new Log(dir,
>                   config,
>                   recoveryPoints.getOrElse(topicPartition, 0L),
>                   scheduler,
>                   time)
>                 val previous = addLogWithLock(topicPartition, log)
>                 if(previous != null)
>                   throw new IllegalArgumentException("Duplicate log 
> directories found: %s, %s!".format(log.dir.getAbsolutePath, 
> previous.dir.getAbsolutePath))
>               }
>             }
>             cleanShutDownFile.delete()
>           }
>         }
>       })
>       thread.start()
>       threads(i) = thread
>       i = i + 1
>     }
>     for(thread <- threads) {
>       thread.join()
>     }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
>     logCreationOrDeletionLock synchronized {
>       this.logs.put(topicPartition, log)
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to