jsancio commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1668893365
########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -300,19 +301,19 @@ object StorageTool extends Logging { } private def getUserScramCredentialRecord( - mechanism: String, - config: String - ) : UserScramCredentialRecord = { + mechanism: String, + config: String + ) : UserScramCredentialRecord = { /* * Remove '[' amd ']' * Split K->V pairs on ',' and no K or V should contain ',' * Split K and V on '=' but V could contain '=' if inside "" * Create Map of K to V and replace all " in V */ val argMap = config.substring(1, config.length - 1) - .split(",") - .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) - .map(args => args(0) -> args(1).replaceAll("\"", "")).toMap + .split(",") + .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) + .map(args => args(0) -> args(1).replaceAll("\"", "")).toMap Review Comment: Again, this is not the correct indentation. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -112,20 +112,21 @@ object StorageTool extends Logging { setNodeId(config.nodeId). build() val standaloneMode = namespace.getBoolean("standalone") - var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = List() val controllersQuorumVoters = namespace.getString("controller_quorum_voters") if(standaloneMode && controllersQuorumVoters != null) { throw new TerseFailure("Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set.") } + var listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() if (standaloneMode) { - advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners + listeners = createStandaloneVoterMap(config) } else if(controllersQuorumVoters != null) { if (!validateControllerQuorumVoters(controllersQuorumVoters)) { throw new TerseFailure("Expected schema for --controller-quorum-voters is <replica-id>[-<replica-directory-id>]@<host>:<port>") } - advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners + val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = parseVoterConnections(Collections.singletonList(controllersQuorumVoters)) + listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, metaProperties, config) Review Comment: This doesn't look correct. `VoterSet` is basically a `Map[Integer, (Uuid, Map[ListenerName, InetSocketAddress])]` where `Integer` is the replica id and `Uuid` is the replica directory id. The type for listener doesn't contain all of the information needed to generate a `VoterSet` for all the possible configuration cases. Note that the value for `--controller-quorum-voters` has the follow schema: `<replica-id>-<replica-directory-id>@<host>:<port>` ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -300,19 +301,19 @@ object StorageTool extends Logging { } private def getUserScramCredentialRecord( - mechanism: String, - config: String - ) : UserScramCredentialRecord = { + mechanism: String, + config: String + ) : UserScramCredentialRecord = { Review Comment: Extra space between `)` and `:`. It should be `): User... = {` ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -335,10 +336,10 @@ object StorageTool extends Logging { if (argMap.contains("salt")) { val iterations = argMap("iterations").toInt if (iterations < scramMechanism.minIterations()) { - throw new TerseFailure(s"The 'iterations' value must be >= ${scramMechanism.minIterations()} for add-scram") + throw new TerseFailure(s"The 'iterations' value must be >= ${scramMechanism.minIterations()} for add-scram") } if (iterations > scramMechanism.maxIterations()) { - throw new TerseFailure(s"The 'iterations' value must be <= ${scramMechanism.maxIterations()} for add-scram") + throw new TerseFailure(s"The 'iterations' value must be <= ${scramMechanism.maxIterations()} for add-scram") Review Comment: Looks like you have extra two spaces! ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -506,8 +507,8 @@ object StorageTool extends Logging { val metadataRecords = new util.ArrayList[ApiMessageAndVersion] metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(metadataVersion.featureLevel()), 0.toShort)) + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel()), 0.toShort)) Review Comment: Incorrect indentation. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -347,22 +348,22 @@ object StorageTool extends Logging { } def getSaltedPassword( - argMap: Map[String,String], - scramMechanism : ScramMechanism, - salt : Array[Byte], - iterations: Int - ) : Array[Byte] = { + argMap: Map[String,String], + scramMechanism : ScramMechanism, + salt : Array[Byte], + iterations: Int + ) : Array[Byte] = { if (argMap.contains("password")) { if (argMap.contains("saltedpassword")) { - throw new TerseFailure(s"You must only supply one of 'password' or 'saltedpassword' to add-scram") + throw new TerseFailure(s"You must only supply one of 'password' or 'saltedpassword' to add-scram") } new ScramFormatter(scramMechanism).saltedPassword(argMap("password"), salt, iterations) } else { if (!argMap.contains("saltedpassword")) { - throw new TerseFailure(s"You must supply one of 'password' or 'saltedpassword' to add-scram") + throw new TerseFailure(s"You must supply one of 'password' or 'saltedpassword' to add-scram") } if (!argMap.contains("salt")) { - throw new TerseFailure(s"You must supply 'salt' with 'saltedpassword' to add-scram") + throw new TerseFailure(s"You must supply 'salt' with 'saltedpassword' to add-scram") Review Comment: Extra spaces. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -698,6 +663,42 @@ object StorageTool extends Logging { voterSet } + def createStandaloneVoterMap(config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { + val advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners + val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() + advertisedListenerEndpoints.foreach(endpoint => { + val host: String = endpoint.host + listeners.put(endpoint.listenerName, new InetSocketAddress(host, endpoint.port)) + }) + listeners + } + + private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress], + metaProperties: MetaProperties, + config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { + val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() + controllerQuorumVoterMap.keySet().forEach(replicaId => { + if (metaProperties.nodeId().getAsInt == replicaId) { + val listenerNameOption = config.effectiveAdvertisedControllerListeners. Review Comment: We should assume that the listener name is the default listener name. The default listener name is the first listener in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L730-L737. There is also this code for an example of getting the first listener name: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L267 There is one validation that we should do for the local replica. The local replica's default listener (name, host and port) matches the entry specified in `--controller-quorum-voters`. ########## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ########## @@ -713,7 +733,7 @@ Found problem: StorageTool.main(args) } catch { case e: StorageToolTestException => assertEquals("", exitString) - assertEquals(0, exitStatus) + assertEquals(0, exitStatus) Review Comment: Incorrect indentation! ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -199,7 +200,7 @@ object StorageTool extends Logging { val level: java.lang.Short = specifiedFeatures.getOrElse(feature.featureName, feature.defaultValue(metadataVersionForDefault)) // Only set feature records for levels greater than 0. 0 is assumed if there is no record. Throw an error if level < 0. if (level != 0) { - allNonZeroFeaturesAndLevels.append(feature.fromFeatureLevel(level, unstableFeatureVersionsEnabled)) + allNonZeroFeaturesAndLevels.append(feature.fromFeatureLevel(level, unstableFeatureVersionsEnabled)) Review Comment: Looks like we are missing a space. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -347,22 +348,22 @@ object StorageTool extends Logging { } def getSaltedPassword( - argMap: Map[String,String], - scramMechanism : ScramMechanism, - salt : Array[Byte], - iterations: Int - ) : Array[Byte] = { + argMap: Map[String,String], + scramMechanism : ScramMechanism, + salt : Array[Byte], + iterations: Int + ) : Array[Byte] = { Review Comment: There shouldn't be a space between `)` and `:`. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -377,12 +378,12 @@ object StorageTool extends Logging { val formatter = new ScramFormatter(scramMechanism) new UserScramCredentialRecord() - .setName(name) - .setMechanism(scramMechanism.`type`) - .setSalt(salt) - .setStoredKey(formatter.storedKey(formatter.clientKey(saltedPassword))) - .setServerKey(formatter.serverKey(saltedPassword)) - .setIterations(iterations) + .setName(name) + .setMechanism(scramMechanism.`type`) + .setSalt(salt) + .setStoredKey(formatter.storedKey(formatter.clientKey(saltedPassword))) + .setServerKey(formatter.serverKey(saltedPassword)) + .setIterations(iterations) Review Comment: For Scala in Kafka we indent two spaces. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -698,6 +663,42 @@ object StorageTool extends Logging { voterSet } + def createStandaloneVoterMap(config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { + val advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners + val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() + advertisedListenerEndpoints.foreach(endpoint => { + val host: String = endpoint.host + listeners.put(endpoint.listenerName, new InetSocketAddress(host, endpoint.port)) + }) + listeners + } + + private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress], + metaProperties: MetaProperties, + config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { Review Comment: Incorrect indentation: ```java private def parseControllerQuorumVotersMap( controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress], metaProperties: MetaProperties, config: KafkaConfig ): util.Map[ListenerName, InetSocketAddress] = { ``` Aren't you losing information if you map from `util.Map[Integer, InetSocketAddress]` to `util.Map[ListenerName, InetSocketAddress]`? Why are you removing replicas that are not the local replica? The `VoterSet` must contain all of the voters in `--controller-quorum-voters` not just the local replica. Why would Kafka require all of the voters in `--controller-quorum-voters` to only use the local voter? ########## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ########## @@ -598,9 +618,9 @@ Found problem: // Validate we can add multiple SCRAM creds. scramRecords = parseAddScram("-S", - "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]", - "-S", - "SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]") + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]", + "-S", + "SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]") Review Comment: Incorrect indentation. ########## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ########## @@ -342,8 +340,8 @@ Found problem: val config = new KafkaConfig(newSelfManagedProperties()) assertEquals("Cluster ID string invalid does not appear to be a valid UUID: " + "Input string `invalid` decoded as 5 bytes, which is not equal to the expected " + - "16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure], - () => StorageTool.buildMetadataProperties("invalid", config)).getMessage) + "16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure], + () => StorageTool.buildMetadataProperties("invalid", config)).getMessage) Review Comment: Indentation doesn't look correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org