ijuma commented on a change in pull request #8970: URL: https://github.com/apache/kafka/pull/8970#discussion_r448723895
########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -308,11 +312,11 @@ class LogManager(logDirs: Seq[File], threadPools.append(pool) val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) - if (cleanShutdownFile.exists) { - debug(s"Found clean shutdown file. Skipping recovery for all logs in data directory: ${dir.getAbsolutePath}") + info(s"Skipping recovery for all logs in $dir since clean shutdown file was found") Review comment: Have we checked that the `dir.toString` does what we want? If the path is relative, I think it will only print that. ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -321,25 +325,32 @@ class LogManager(logDirs: Seq[File], recoveryPoints = this.recoveryPointCheckpoints(dir).read() } catch { case e: Exception => - warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e) - warn("Resetting the recovery checkpoint to 0") + warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir, " + + "resetting the recovery checkpoint to 0", e) } var logStartOffsets = Map[TopicPartition, Long]() try { logStartOffsets = this.logStartOffsetCheckpoints(dir).read() } catch { case e: Exception => - warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e) + warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir, " + + "resetting to the base offset of the first segment", e) } - val jobsForDir = for { - dirContent <- Option(dir.listFiles).toList - logDir <- dirContent if logDir.isDirectory - } yield { + val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory) + val numRemainingLogsToLoad = new AtomicInteger(logsToLoad.length) + + val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { try { - loadLog(logDir, recoveryPoints, logStartOffsets) + debug(s"Loading log $logDir") + val logLoadStartMs = time.milliseconds() + val log = loadLog(logDir, recoveryPoints, logStartOffsets) + numRemainingLogsToLoad.decrementAndGet() Review comment: Can we keep the variable here and then use it in the log below? Also, should we decrement the value if there's an exception? ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -379,7 +391,7 @@ class LogManager(logDirs: Seq[File], threadPools.foreach(_.shutdown()) } - info(s"Logs loading complete in ${time.milliseconds - startMs} ms.") + info(s"Log loading completed after ${time.milliseconds - startMs}ms.") Review comment: Could we maybe say something like `Loaded n logs after...` where `n` is the number of logs we loaded? ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -321,25 +325,32 @@ class LogManager(logDirs: Seq[File], recoveryPoints = this.recoveryPointCheckpoints(dir).read() } catch { case e: Exception => - warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e) - warn("Resetting the recovery checkpoint to 0") + warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir, " + + "resetting the recovery checkpoint to 0", e) } var logStartOffsets = Map[TopicPartition, Long]() try { logStartOffsets = this.logStartOffsetCheckpoints(dir).read() } catch { case e: Exception => - warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e) + warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir, " + + "resetting to the base offset of the first segment", e) } - val jobsForDir = for { - dirContent <- Option(dir.listFiles).toList - logDir <- dirContent if logDir.isDirectory - } yield { + val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory) + val numRemainingLogsToLoad = new AtomicInteger(logsToLoad.length) + + val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { try { - loadLog(logDir, recoveryPoints, logStartOffsets) + debug(s"Loading log $logDir") + val logLoadStartMs = time.milliseconds() + val log = loadLog(logDir, recoveryPoints, logStartOffsets) + numRemainingLogsToLoad.decrementAndGet() + info(s"Completed load of $log with ${log.numberOfSegments} segments " + + s"in ${time.milliseconds() - logLoadStartMs}ms " + Review comment: We should fix this and related code to use `hiResClockMs` since it's monotonic. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org