cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373822529


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -407,36 +414,41 @@ object StorageTool extends Logging {
     if (directories.isEmpty) {
       throw new TerseFailure("No log directories found in the configuration.")
     }
-
-    val unformattedDirectories = directories.filter(directory => {
-      if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
-          true
-      } else if (!ignoreFormatted) {
-        throw new TerseFailure(s"Log directory $directory is already 
formatted. " +
-          "Use --ignore-formatted to ignore this directory and format the 
others.")
-      } else {
-        false
-      }
-    })
-    if (unformattedDirectories.isEmpty) {
+    val loader = new MetaPropertiesEnsemble.Loader()
+    directories.foreach(loader.addLogDir(_))
+    val metaPropertiesEnsemble = loader.load()
+    metaPropertiesEnsemble.verify(metaProperties.clusterId(), 
metaProperties.nodeId(),
+      util.EnumSet.noneOf(classOf[VerificationFlag]))
+
+    val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
+    if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
+      val firstLogDir = copier.logDirProps().keySet().iterator().next()
+      throw new TerseFailure(s"Log directory ${firstLogDir} directory is 
already formatted. " +
+        "Use --ignore-formatted to ignore this directory and format the 
others.")
+    }
+    if (!copier.errorLogDirs().isEmpty) {
+      val firstLogDir = copier.errorLogDirs().iterator().next()
+      throw new TerseFailure(s"I/O error trying to read log directory 
${firstLogDir}.")
+    }
+    if (copier.emptyLogDirs().isEmpty) {
       stream.println("All of the log directories are already formatted.")
+    } else {
+      copier.emptyLogDirs().forEach(logDir => {
+        val newMetaProperties = new MetaProperties.Builder(metaProperties).
+          setDirectoryId(copier.generateValidDirectoryId()).
+          build()
+        copier.logDirProps().put(logDir, newMetaProperties)

Review Comment:
   It seems a bit cumbersome to put accessors on all the maps. But I'm open to 
ideas.
   
   I do wish Java had some way of flagging this map as different from the 
immutable ones. Kind of like `const` in C++, or yes, the whole menagerie of 
immtuable/mutable Scala classes. Although that comes with its own set of 
problems.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to