muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1665644755


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -102,6 +111,23 @@ object StorageTool extends Logging {
       setClusterId(clusterId).
       setNodeId(config.nodeId).
       build()
+    val standaloneMode = namespace.getBoolean("standalone")
+    var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = 
List()
+
+    val controllersQuorumVoters = 
namespace.getString("controller_quorum_voters")
+    if(standaloneMode && controllersQuorumVoters != null) {
+      throw new TerseFailure("Both --standalone and --controller-quorum-voters 
were set. Only one of the two flags can be set.")
+    }
+
+    if (standaloneMode) {
+      advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners
+    } else if(controllersQuorumVoters != null) {
+      if (!validateControllerQuorumVoters(controllersQuorumVoters)) {
+        throw new TerseFailure("Expected schema for --controller-quorum-voters 
is <replica-id>[-<replica-directory-id>]@<host>:<port>")
+      }
+      advertisedListenerEndpoints = 
config.effectiveAdvertisedControllerListeners

Review Comment:
   Fixed.



##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -285,6 +362,49 @@ Found problem:
       "Expected the default metadata.version to be 3.3-IV2")
   }
 
+  @Test
+  def testStandaloneModeWithArguments(): Unit = {
+    val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAT",
+    "-s"))
+    val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))
+    val exitCode = StorageTool.runFormatCommand(namespace, config)
+    val tempDirs = config.logDirs
+    tempDirs.foreach(tempDir => {
+      val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME
+      val checkpointFilePath = 
Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID)
+      assertTrue(checkpointFilePath.toFile.exists)
+      
assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost"))
+      Utils.delete(new File(tempDir))
+    })
+    assertEquals(0, exitCode)
+  }
+
+//  @Test TODO
+//  def testControllerQuorumVotersWithArguments(): Unit = {
+//    val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yQQjhMQB6JAT",
+//      "--controller-quorum-voters", "1@localhost:9092"))
+//    val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))

Review Comment:
   Not sure how to set controller listener



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to