ijuma commented on a change in pull request #8970:
URL: https://github.com/apache/kafka/pull/8970#discussion_r448723895



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -308,11 +312,11 @@ class LogManager(logDirs: Seq[File],
         threadPools.append(pool)
 
         val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
-
         if (cleanShutdownFile.exists) {
-          debug(s"Found clean shutdown file. Skipping recovery for all logs in 
data directory: ${dir.getAbsolutePath}")
+          info(s"Skipping recovery for all logs in $dir since clean shutdown 
file was found")

Review comment:
       Have we checked that the `dir.toString` does what we want? If the path 
is relative, I think it will only print that.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -321,25 +325,32 @@ class LogManager(logDirs: Seq[File],
           recoveryPoints = this.recoveryPointCheckpoints(dir).read()
         } catch {
           case e: Exception =>
-            warn(s"Error occurred while reading 
recovery-point-offset-checkpoint file of directory $dir", e)
-            warn("Resetting the recovery checkpoint to 0")
+            warn(s"Error occurred while reading 
recovery-point-offset-checkpoint file of directory $dir, " +
+              "resetting the recovery checkpoint to 0", e)
         }
 
         var logStartOffsets = Map[TopicPartition, Long]()
         try {
           logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
         } catch {
           case e: Exception =>
-            warn(s"Error occurred while reading log-start-offset-checkpoint 
file of directory $dir", e)
+            warn(s"Error occurred while reading log-start-offset-checkpoint 
file of directory $dir, " +
+              "resetting to the base offset of the first segment", e)
         }
 
-        val jobsForDir = for {
-          dirContent <- Option(dir.listFiles).toList
-          logDir <- dirContent if logDir.isDirectory
-        } yield {
+        val logsToLoad = 
Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory)
+        val numRemainingLogsToLoad = new AtomicInteger(logsToLoad.length)
+
+        val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
             try {
-              loadLog(logDir, recoveryPoints, logStartOffsets)
+              debug(s"Loading log $logDir")
+              val logLoadStartMs = time.milliseconds()
+              val log = loadLog(logDir, recoveryPoints, logStartOffsets)
+              numRemainingLogsToLoad.decrementAndGet()

Review comment:
       Can we keep the variable here and then use it in the log below? Also, 
should we decrement the value if there's an exception?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -379,7 +391,7 @@ class LogManager(logDirs: Seq[File],
       threadPools.foreach(_.shutdown())
     }
 
-    info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
+    info(s"Log loading completed after ${time.milliseconds - startMs}ms.")

Review comment:
       Could we maybe say something like `Loaded n logs after...` where `n` is 
the number of logs we loaded?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -321,25 +325,32 @@ class LogManager(logDirs: Seq[File],
           recoveryPoints = this.recoveryPointCheckpoints(dir).read()
         } catch {
           case e: Exception =>
-            warn(s"Error occurred while reading 
recovery-point-offset-checkpoint file of directory $dir", e)
-            warn("Resetting the recovery checkpoint to 0")
+            warn(s"Error occurred while reading 
recovery-point-offset-checkpoint file of directory $dir, " +
+              "resetting the recovery checkpoint to 0", e)
         }
 
         var logStartOffsets = Map[TopicPartition, Long]()
         try {
           logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
         } catch {
           case e: Exception =>
-            warn(s"Error occurred while reading log-start-offset-checkpoint 
file of directory $dir", e)
+            warn(s"Error occurred while reading log-start-offset-checkpoint 
file of directory $dir, " +
+              "resetting to the base offset of the first segment", e)
         }
 
-        val jobsForDir = for {
-          dirContent <- Option(dir.listFiles).toList
-          logDir <- dirContent if logDir.isDirectory
-        } yield {
+        val logsToLoad = 
Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory)
+        val numRemainingLogsToLoad = new AtomicInteger(logsToLoad.length)
+
+        val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
             try {
-              loadLog(logDir, recoveryPoints, logStartOffsets)
+              debug(s"Loading log $logDir")
+              val logLoadStartMs = time.milliseconds()
+              val log = loadLog(logDir, recoveryPoints, logStartOffsets)
+              numRemainingLogsToLoad.decrementAndGet()
+              info(s"Completed load of $log with ${log.numberOfSegments} 
segments " +
+                s"in ${time.milliseconds() - logLoadStartMs}ms " +

Review comment:
       We should fix this and related code to use `hiResClockMs` since it's 
monotonic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to