rajinisivaram commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r465952166



##########
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##########
@@ -508,7 +563,15 @@ object ConfigCommand extends Config {
 
       val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ 
entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ")
       val entriesStr = entries.asScala.map(e => 
s"${e._1}=${e._2}").mkString(", ")
-      println(s"Configs for ${entityStr} are ${entriesStr}")
+      println(s"Quota configs for ${entityStr} are ${entriesStr}")
+    }
+    // we describe user SCRAM credentials only when we are not describing 
client information
+    // and we are not given either --entity-default or --user-defaults
+    if (!entityTypes.contains(ConfigType.Client) && !entityNames.contains("")) 
{
+      getUserScramCredentialConfigs(adminClient, entityNames).foreach { case 
(user, description) =>

Review comment:
       What do we do if the user is not a SCRAM user? Won't this throw an 
exception? Can we make sure that user without quota or SCRAM credential doesn't 
print any errors or exceptions?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -980,4 +984,137 @@ class AdminManager(val config: KafkaConfig,
       entry.entity -> apiError
     }.toMap
   }
+
+  def describeUserScramCredentials(users: Option[Seq[String]]): 
DescribeUserScramCredentialsResponseData = {
+    val retval = new DescribeUserScramCredentialsResponseData()
+
+    def addToResults(user: String, userConfig: Properties) = {
+      val configKeys = userConfig.stringPropertyNames
+      val hasScramCredential = !ScramMechanism.values().toList.filter(key => 
key != ScramMechanism.UNKNOWN && 
configKeys.contains(key.getMechanismName)).isEmpty
+      if (hasScramCredential) {
+        val userScramCredentials = new UserScramCredential().setName(user)
+        ScramMechanism.values().foreach(mechanism => if (mechanism != 
ScramMechanism.UNKNOWN) {
+          val propertyValue = 
userConfig.getProperty(mechanism.getMechanismName)
+          if (propertyValue != null) {
+            val iterations = 
ScramCredentialUtils.credentialFromString(propertyValue).iterations
+            userScramCredentials.credentialInfos.add(new 
CredentialInfo().setMechanism(mechanism.getType).setIterations(iterations))
+          }
+        })
+        retval.userScramCredentials.add(userScramCredentials)
+      }
+    }
+
+    if (!users.isDefined || users.get.isEmpty)
+      // describe all users
+      adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach { case 
(user, properties) => addToResults(user, properties) }
+    else {
+      // describe specific users
+      // 
https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list
+      val duplicatedUsers = users.get.groupBy(identity).collect { case (x, 
Seq(_, _, _*)) => x }
+      if (duplicatedUsers.nonEmpty) {
+        retval.setError(Errors.INVALID_REQUEST.code())
+        retval.setErrorMessage(s"Cannot describe SCRAM credentials for the 
same user twice in a single request: ${duplicatedUsers.mkString("[", ", ", 
"]")}")
+      } else
+        users.get.foreach { user => addToResults(user, 
adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user))) }

Review comment:
       should we catch exception?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List<String> users, 
DescribeUserScramCredentialsOptions options) {
+        final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> 
future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+            @Override
+            public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+                return new DescribeUserScramCredentialsRequest.Builder(
+                        new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+                                new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList())));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().error());
+                switch (error) {
+                    case NONE:
+                        DescribeUserScramCredentialsResponseData data = 
response.data();
+                        
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+                            
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+                            userScramCredential -> {
+                                List<ScramCredentialInfo> scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+                                    credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.fromType(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+                                        .collect(Collectors.toList());
+                                return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+                            })));
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(error);
+                        break;
+                    default:
+                        future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+                        break;
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new DescribeUserScramCredentialsResult(future);
+    }
+
+    @Override
+    public AlterUserScramCredentialsResult 
alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
+                                                                     
AlterUserScramCredentialsOptions options) {
+        final long now = time.milliseconds();
+        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        for (UserScramCredentialAlteration alteration: alterations) {
+            futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+        }
+        final Map<String, Exception> userIllegalAlterationExceptions = new 
HashMap<>();
+        // We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+            UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+            ScramMechanism mechanism = deletion.getMechanism();
+            if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+                userIllegalAlterationExceptions.put(deletion.getUser(), new 
InvalidRequestException("Unknown SCRAM mechanism"));
+            }
+        });
+        // Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+        // so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+        final Map<String, Map<ScramMechanism, 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions 
= new HashMap<>();
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                .filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+                .forEach(alteration -> {
+                    UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+                    String user = upsertion.getUser();
+                    try {
+                        ScramMechanism mechanism = 
upsertion.getInfo().getMechanism();
+                        if (mechanism == null || mechanism == 
ScramMechanism.UNKNOWN)
+                            throw new InvalidRequestException("Unknown SCRAM 
mechanism");
+                        userInsertions.putIfAbsent(user, new HashMap<>());
+                        userInsertions.get(user).put(mechanism, 
getScramCredentialUpsertion(upsertion));
+                    } catch (Exception e) {
+                        // we might overwrite an exception from a previous 
upsertion, but we don't really care
+                        // since we just need to mark this user as having at 
least one illegal alteration
+                        // and make an exception instance available for 
completing the corresponding future exceptionally
+                        userIllegalAlterationExceptions.put(user, e);
+                    }
+                });
+        // fail any users immediately that have an illegal alteration as 
identified above
+        userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> {
+            
futures.get(entry.getKey()).completeExceptionally(entry.getValue());
+        });
+
+        // submit alterations for users that do not have an illegal upsertion 
as identified above
+        Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, 
options.timeoutMs()),
+                new ControllerNodeProvider()) {
+            @Override
+            public AlterUserScramCredentialsRequest.Builder createRequest(int 
timeoutMs) {
+                return new AlterUserScramCredentialsRequest.Builder(
+                        new 
AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
+                                .filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.getUser()))
+                                .map(a -> 
userInsertions.get(a.getUser()).get(((UserScramCredentialUpsertion) 
a).getInfo().getMechanism()))
+                                .collect(Collectors.toList()))
+                        .setDeletions(alterations.stream()
+                                .filter(a -> a instanceof 
UserScramCredentialDeletion)
+                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.getUser()))
+                                .map(d ->
+                                
getScramCredentialDeletion((UserScramCredentialDeletion) 
d)).collect(Collectors.toList())));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                AlterUserScramCredentialsResponse response = 
(AlterUserScramCredentialsResponse) abstractResponse;
+                // Check for controller change
+                for (Errors error : response.errorCounts().keySet()) {
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                    }
+                }
+                response.data().results().forEach(result -> {
+                    KafkaFutureImpl<Void> future = futures.get(result.user());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown user {}", 
result.user());
+                    } else {
+                        Errors error = Errors.forCode(result.errorCode());
+                        if (error != Errors.NONE) {
+                            future.completeExceptionally(error.exception());
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                });
+                completeUnrealizedFutures(
+                    futures.entrySet().stream(),
+                    user -> "The broker response did not contain a result for 
user " + user);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new AlterUserScramCredentialsResult(new HashMap<>(futures));
+    }
+
+    private static 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion 
getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws 
InvalidKeyException, NoSuchAlgorithmException {
+        AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = 
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion();
+        return retval.setName(u.getUser())
+                .setMechanism(u.getInfo().getMechanism().getType())
+                .setIterations(u.getInfo().getIterations())
+                .setSalt(u.getSalt())
+                
.setSaltedPassword(getSaltedPasword(u.getInfo().getMechanism(), 
u.getPassword(), u.getSalt(), u.getInfo().getIterations()));
+    }
+
+    private static 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion 
getScramCredentialDeletion(UserScramCredentialDeletion d) {
+        return new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(d.getUser()).setMechanism(d.getMechanism().getType());
+    }
+
+    private static byte[] getSaltedPasword(ScramMechanism 
publicScramMechanism, byte[] password, byte[] salt, int interations) throws 
NoSuchAlgorithmException, InvalidKeyException {

Review comment:
       typo: iterations

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2973,6 +2975,40 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDescribeUserScramCredentialsRequest(request: 
RequestChannel.Request): Unit = {
+    val describeUserScramCredentialsRequest = 
request.body[DescribeUserScramCredentialsRequest]
+
+    if (!controller.isActive) {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception))
+    } else if (authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
+      val result = adminManager.describeUserScramCredentials(
+        
Option(describeUserScramCredentialsRequest.data.users.asScala.map(_.name).toList))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new 
DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
+    } else {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+    }
+  }
+
+  def handleAlterUserScramCredentialsRequest(request: RequestChannel.Request): 
Unit = {
+    val alterUserScramCredentialsRequest = 
request.body[AlterUserScramCredentialsRequest]
+
+    if (!controller.isActive) {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception))
+    } else if (authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {

Review comment:
       I think we should also not allow users who authenticated using 
delegation tokens to create or update users. We don't allow these users to 
create new tokens, it would be odd if they could create a new user or the 
password of the user of the token.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List<String> users, 
DescribeUserScramCredentialsOptions options) {
+        final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> 
future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {

Review comment:
       Does this need to be controller for Describe? It looks similar to 
DescribeAcls where we use LeastLoadedNode.

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -980,4 +984,137 @@ class AdminManager(val config: KafkaConfig,
       entry.entity -> apiError
     }.toMap
   }
+
+  def describeUserScramCredentials(users: Option[Seq[String]]): 
DescribeUserScramCredentialsResponseData = {
+    val retval = new DescribeUserScramCredentialsResponseData()
+
+    def addToResults(user: String, userConfig: Properties) = {
+      val configKeys = userConfig.stringPropertyNames
+      val hasScramCredential = !ScramMechanism.values().toList.filter(key => 
key != ScramMechanism.UNKNOWN && 
configKeys.contains(key.getMechanismName)).isEmpty
+      if (hasScramCredential) {
+        val userScramCredentials = new UserScramCredential().setName(user)
+        ScramMechanism.values().foreach(mechanism => if (mechanism != 
ScramMechanism.UNKNOWN) {
+          val propertyValue = 
userConfig.getProperty(mechanism.getMechanismName)
+          if (propertyValue != null) {
+            val iterations = 
ScramCredentialUtils.credentialFromString(propertyValue).iterations
+            userScramCredentials.credentialInfos.add(new 
CredentialInfo().setMechanism(mechanism.getType).setIterations(iterations))
+          }
+        })
+        retval.userScramCredentials.add(userScramCredentials)
+      }
+    }
+
+    if (!users.isDefined || users.get.isEmpty)
+      // describe all users
+      adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach { case 
(user, properties) => addToResults(user, properties) }
+    else {
+      // describe specific users
+      // 
https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list
+      val duplicatedUsers = users.get.groupBy(identity).collect { case (x, 
Seq(_, _, _*)) => x }
+      if (duplicatedUsers.nonEmpty) {
+        retval.setError(Errors.INVALID_REQUEST.code())
+        retval.setErrorMessage(s"Cannot describe SCRAM credentials for the 
same user twice in a single request: ${duplicatedUsers.mkString("[", ", ", 
"]")}")
+      } else
+        users.get.foreach { user => addToResults(user, 
adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user))) }
+    }
+    retval
+  }
+
+  def alterUserScramCredentials(upsertions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion],
+                                deletions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]): 
AlterUserScramCredentialsResponseData = {
+
+    def scramMechanism(mechanism: Byte): ScramMechanism = {
+      ScramMechanism.fromType(mechanism)
+    }
+
+    def mechanismName(mechanism: Byte): String = {
+      scramMechanism(mechanism).getMechanismName
+    }
+
+    val retval = new AlterUserScramCredentialsResponseData()
+
+    // fail any user that is invalid due to an empty user name, an unknown 
SCRAM mechanism, or not enough iterations
+    val invalidUsers = (
+      upsertions.filter(upsertion => {
+        if (upsertion.name.isEmpty)
+          true
+        else {
+          val publicScramMechanism = scramMechanism(upsertion.mechanism)
+          publicScramMechanism == ScramMechanism.UNKNOWN ||
+            (upsertion.iterations != -1 &&
+              
InternalScramMechanism.forMechanismName(publicScramMechanism.getMechanismName).minIterations
 > upsertion.iterations)
+        }
+      }).map(_.name) ++ deletions.filter(deletions => deletions.name.isEmpty 
|| scramMechanism(deletions.mechanism) == ScramMechanism.UNKNOWN).map(_.name))
+      .toSet
+    invalidUsers.foreach(user => retval.results.add(new 
AlterUserScramCredentialsResult().setUser(user)
+      .setErrorCode(Errors.INVALID_REQUEST.code).setErrorMessage("Unknown 
SCRAM mechanism or too few iterations")))

Review comment:
       Can't we return an exception message that says exactly why, instead of 
`x or y`

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -980,4 +984,137 @@ class AdminManager(val config: KafkaConfig,
       entry.entity -> apiError
     }.toMap
   }
+
+  def describeUserScramCredentials(users: Option[Seq[String]]): 
DescribeUserScramCredentialsResponseData = {
+    val retval = new DescribeUserScramCredentialsResponseData()
+
+    def addToResults(user: String, userConfig: Properties) = {
+      val configKeys = userConfig.stringPropertyNames
+      val hasScramCredential = !ScramMechanism.values().toList.filter(key => 
key != ScramMechanism.UNKNOWN && 
configKeys.contains(key.getMechanismName)).isEmpty
+      if (hasScramCredential) {
+        val userScramCredentials = new UserScramCredential().setName(user)
+        ScramMechanism.values().foreach(mechanism => if (mechanism != 
ScramMechanism.UNKNOWN) {
+          val propertyValue = 
userConfig.getProperty(mechanism.getMechanismName)
+          if (propertyValue != null) {
+            val iterations = 
ScramCredentialUtils.credentialFromString(propertyValue).iterations
+            userScramCredentials.credentialInfos.add(new 
CredentialInfo().setMechanism(mechanism.getType).setIterations(iterations))
+          }
+        })
+        retval.userScramCredentials.add(userScramCredentials)
+      }
+    }
+
+    if (!users.isDefined || users.get.isEmpty)
+      // describe all users
+      adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach { case 
(user, properties) => addToResults(user, properties) }
+    else {
+      // describe specific users
+      // 
https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list
+      val duplicatedUsers = users.get.groupBy(identity).collect { case (x, 
Seq(_, _, _*)) => x }
+      if (duplicatedUsers.nonEmpty) {
+        retval.setError(Errors.INVALID_REQUEST.code())
+        retval.setErrorMessage(s"Cannot describe SCRAM credentials for the 
same user twice in a single request: ${duplicatedUsers.mkString("[", ", ", 
"]")}")
+      } else
+        users.get.foreach { user => addToResults(user, 
adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user))) }
+    }
+    retval
+  }
+
+  def alterUserScramCredentials(upsertions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion],
+                                deletions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]): 
AlterUserScramCredentialsResponseData = {
+
+    def scramMechanism(mechanism: Byte): ScramMechanism = {
+      ScramMechanism.fromType(mechanism)
+    }
+
+    def mechanismName(mechanism: Byte): String = {
+      scramMechanism(mechanism).getMechanismName
+    }
+
+    val retval = new AlterUserScramCredentialsResponseData()
+
+    // fail any user that is invalid due to an empty user name, an unknown 
SCRAM mechanism, or not enough iterations
+    val invalidUsers = (
+      upsertions.filter(upsertion => {
+        if (upsertion.name.isEmpty)
+          true
+        else {
+          val publicScramMechanism = scramMechanism(upsertion.mechanism)
+          publicScramMechanism == ScramMechanism.UNKNOWN ||
+            (upsertion.iterations != -1 &&
+              
InternalScramMechanism.forMechanismName(publicScramMechanism.getMechanismName).minIterations
 > upsertion.iterations)

Review comment:
       Shouldn't we also limit the max value for iterations?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -980,4 +984,137 @@ class AdminManager(val config: KafkaConfig,
       entry.entity -> apiError
     }.toMap
   }
+
+  def describeUserScramCredentials(users: Option[Seq[String]]): 
DescribeUserScramCredentialsResponseData = {
+    val retval = new DescribeUserScramCredentialsResponseData()
+
+    def addToResults(user: String, userConfig: Properties) = {
+      val configKeys = userConfig.stringPropertyNames
+      val hasScramCredential = !ScramMechanism.values().toList.filter(key => 
key != ScramMechanism.UNKNOWN && 
configKeys.contains(key.getMechanismName)).isEmpty
+      if (hasScramCredential) {
+        val userScramCredentials = new UserScramCredential().setName(user)
+        ScramMechanism.values().foreach(mechanism => if (mechanism != 
ScramMechanism.UNKNOWN) {
+          val propertyValue = 
userConfig.getProperty(mechanism.getMechanismName)
+          if (propertyValue != null) {
+            val iterations = 
ScramCredentialUtils.credentialFromString(propertyValue).iterations
+            userScramCredentials.credentialInfos.add(new 
CredentialInfo().setMechanism(mechanism.getType).setIterations(iterations))
+          }
+        })
+        retval.userScramCredentials.add(userScramCredentials)
+      }
+    }
+
+    if (!users.isDefined || users.get.isEmpty)
+      // describe all users
+      adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach { case 
(user, properties) => addToResults(user, properties) }
+    else {
+      // describe specific users
+      // 
https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list
+      val duplicatedUsers = users.get.groupBy(identity).collect { case (x, 
Seq(_, _, _*)) => x }
+      if (duplicatedUsers.nonEmpty) {
+        retval.setError(Errors.INVALID_REQUEST.code())
+        retval.setErrorMessage(s"Cannot describe SCRAM credentials for the 
same user twice in a single request: ${duplicatedUsers.mkString("[", ", ", 
"]")}")
+      } else
+        users.get.foreach { user => addToResults(user, 
adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user))) }
+    }
+    retval
+  }
+
+  def alterUserScramCredentials(upsertions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion],
+                                deletions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]): 
AlterUserScramCredentialsResponseData = {
+
+    def scramMechanism(mechanism: Byte): ScramMechanism = {
+      ScramMechanism.fromType(mechanism)
+    }
+
+    def mechanismName(mechanism: Byte): String = {
+      scramMechanism(mechanism).getMechanismName
+    }
+
+    val retval = new AlterUserScramCredentialsResponseData()
+
+    // fail any user that is invalid due to an empty user name, an unknown 
SCRAM mechanism, or not enough iterations
+    val invalidUsers = (
+      upsertions.filter(upsertion => {
+        if (upsertion.name.isEmpty)
+          true
+        else {
+          val publicScramMechanism = scramMechanism(upsertion.mechanism)
+          publicScramMechanism == ScramMechanism.UNKNOWN ||
+            (upsertion.iterations != -1 &&
+              
InternalScramMechanism.forMechanismName(publicScramMechanism.getMechanismName).minIterations
 > upsertion.iterations)
+        }
+      }).map(_.name) ++ deletions.filter(deletions => deletions.name.isEmpty 
|| scramMechanism(deletions.mechanism) == ScramMechanism.UNKNOWN).map(_.name))
+      .toSet
+    invalidUsers.foreach(user => retval.results.add(new 
AlterUserScramCredentialsResult().setUser(user)
+      .setErrorCode(Errors.INVALID_REQUEST.code).setErrorMessage("Unknown 
SCRAM mechanism or too few iterations")))
+
+    val initiallyValidUserMechanismPairs = (upsertions.filter(upsertion => 
!invalidUsers.contains(upsertion.name)).map(upsertion => (upsertion.name, 
upsertion.mechanism)) ++
+      deletions.filter(deletion => 
!invalidUsers.contains(deletion.name)).map(deletion => (deletion.name, 
deletion.mechanism)))
+
+    // 
https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list

Review comment:
       Do we include this kind of comment in code?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -980,4 +984,137 @@ class AdminManager(val config: KafkaConfig,
       entry.entity -> apiError
     }.toMap
   }
+
+  def describeUserScramCredentials(users: Option[Seq[String]]): 
DescribeUserScramCredentialsResponseData = {
+    val retval = new DescribeUserScramCredentialsResponseData()
+
+    def addToResults(user: String, userConfig: Properties) = {
+      val configKeys = userConfig.stringPropertyNames
+      val hasScramCredential = !ScramMechanism.values().toList.filter(key => 
key != ScramMechanism.UNKNOWN && 
configKeys.contains(key.getMechanismName)).isEmpty
+      if (hasScramCredential) {
+        val userScramCredentials = new UserScramCredential().setName(user)
+        ScramMechanism.values().foreach(mechanism => if (mechanism != 
ScramMechanism.UNKNOWN) {
+          val propertyValue = 
userConfig.getProperty(mechanism.getMechanismName)
+          if (propertyValue != null) {
+            val iterations = 
ScramCredentialUtils.credentialFromString(propertyValue).iterations
+            userScramCredentials.credentialInfos.add(new 
CredentialInfo().setMechanism(mechanism.getType).setIterations(iterations))
+          }
+        })
+        retval.userScramCredentials.add(userScramCredentials)
+      }
+    }
+
+    if (!users.isDefined || users.get.isEmpty)
+      // describe all users
+      adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach { case 
(user, properties) => addToResults(user, properties) }
+    else {
+      // describe specific users
+      // 
https://stackoverflow.com/questions/24729544/how-to-find-duplicates-in-a-list
+      val duplicatedUsers = users.get.groupBy(identity).collect { case (x, 
Seq(_, _, _*)) => x }
+      if (duplicatedUsers.nonEmpty) {
+        retval.setError(Errors.INVALID_REQUEST.code())
+        retval.setErrorMessage(s"Cannot describe SCRAM credentials for the 
same user twice in a single request: ${duplicatedUsers.mkString("[", ", ", 
"]")}")
+      } else
+        users.get.foreach { user => addToResults(user, 
adminZkClient.fetchEntityConfig(ConfigType.User, Sanitizer.sanitize(user))) }
+    }
+    retval
+  }
+
+  def alterUserScramCredentials(upsertions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion],
+                                deletions: 
Seq[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]): 
AlterUserScramCredentialsResponseData = {
+
+    def scramMechanism(mechanism: Byte): ScramMechanism = {
+      ScramMechanism.fromType(mechanism)
+    }
+
+    def mechanismName(mechanism: Byte): String = {
+      scramMechanism(mechanism).getMechanismName
+    }
+
+    val retval = new AlterUserScramCredentialsResponseData()
+
+    // fail any user that is invalid due to an empty user name, an unknown 
SCRAM mechanism, or not enough iterations
+    val invalidUsers = (
+      upsertions.filter(upsertion => {
+        if (upsertion.name.isEmpty)
+          true
+        else {
+          val publicScramMechanism = scramMechanism(upsertion.mechanism)
+          publicScramMechanism == ScramMechanism.UNKNOWN ||
+            (upsertion.iterations != -1 &&
+              
InternalScramMechanism.forMechanismName(publicScramMechanism.getMechanismName).minIterations
 > upsertion.iterations)

Review comment:
       Shouldn't we also limit the max value for iterations?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to