rajinisivaram commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r468632388
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users, DescribeUserScramCredentialsOptions options) { + final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { + return new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> + new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList()))); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; + Errors error = Errors.forCode(response.data().error()); + switch (error) { + case NONE: + DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, + userScramCredential -> { + List<ScramCredentialInfo> scramCredentialInfos = userScramCredential.credentialInfos().stream().map( + credentialInfo -> new ScramCredentialInfo(ScramMechanism.fromType(credentialInfo.mechanism()), credentialInfo.iterations())) + .collect(Collectors.toList()); + return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); + }))); + break; + case NOT_CONTROLLER: + handleNotControllerError(error); + break; + default: + future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); + break; + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new DescribeUserScramCredentialsResult(future); + } + + @Override + public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations, + AlterUserScramCredentialsOptions options) { + final long now = time.milliseconds(); + final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(); + for (UserScramCredentialAlteration alteration: alterations) { + futures.put(alteration.getUser(), new KafkaFutureImpl<>()); + } + final Map<String, Exception> userIllegalAlterationExceptions = new HashMap<>(); + // We need to keep track of users with deletions of an unknown SCRAM mechanism + alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { + UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; + ScramMechanism mechanism = deletion.getMechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { + userIllegalAlterationExceptions.put(deletion.getUser(), new InvalidRequestException("Unknown SCRAM mechanism")); + } + }); + // Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException, + // so keep track of which users are affected by such a failure and immediately fail all their alterations + final Map<String, Map<ScramMechanism, AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions = new HashMap<>(); + alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.getUser())) + .forEach(alteration -> { + UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; + String user = upsertion.getUser(); + try { + ScramMechanism mechanism = upsertion.getInfo().getMechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) + throw new InvalidRequestException("Unknown SCRAM mechanism"); + userInsertions.putIfAbsent(user, new HashMap<>()); + userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion)); + } catch (Exception e) { + // we might overwrite an exception from a previous upsertion, but we don't really care + // since we just need to mark this user as having at least one illegal alteration + // and make an exception instance available for completing the corresponding future exceptionally + userIllegalAlterationExceptions.put(user, e); + } + }); + // fail any users immediately that have an illegal alteration as identified above + userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> { + futures.get(entry.getKey()).completeExceptionally(entry.getValue()); + }); + + // submit alterations for users that do not have an illegal upsertion as identified above + Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + public AlterUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { + return new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.getUser())) + .map(a -> userInsertions.get(a.getUser()).get(((UserScramCredentialUpsertion) a).getInfo().getMechanism())) + .collect(Collectors.toList())) + .setDeletions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialDeletion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.getUser())) + .map(d -> + getScramCredentialDeletion((UserScramCredentialDeletion) d)).collect(Collectors.toList()))); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + AlterUserScramCredentialsResponse response = (AlterUserScramCredentialsResponse) abstractResponse; + // Check for controller change + for (Errors error : response.errorCounts().keySet()) { + if (error == Errors.NOT_CONTROLLER) { + handleNotControllerError(error); + } + } + response.data().results().forEach(result -> { + KafkaFutureImpl<Void> future = futures.get(result.user()); + if (future == null) { + log.warn("Server response mentioned unknown user {}", result.user()); + } else { + Errors error = Errors.forCode(result.errorCode()); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception(result.errorMessage())); + } else { + future.complete(null); + } + } + }); + completeUnrealizedFutures( + futures.entrySet().stream(), + user -> "The broker response did not contain a result for user " + user); + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }; + runnable.call(call, now); + return new AlterUserScramCredentialsResult(new HashMap<>(futures)); + } + + private static AlterUserScramCredentialsRequestData.ScramCredentialUpsertion getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws InvalidKeyException, NoSuchAlgorithmException { + AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion(); + return retval.setName(u.getUser()) + .setMechanism(u.getInfo().getMechanism().getType()) + .setIterations(u.getInfo().getIterations()) + .setSalt(u.getSalt()) + .setSaltedPassword(getSaltedPasword(u.getInfo().getMechanism(), u.getPassword(), u.getSalt(), u.getInfo().getIterations())); + } + + private static AlterUserScramCredentialsRequestData.ScramCredentialDeletion getScramCredentialDeletion(UserScramCredentialDeletion d) { + return new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(d.getUser()).setMechanism(d.getMechanism().getType()); + } + + private static byte[] getSaltedPasword(ScramMechanism publicScramMechanism, byte[] password, byte[] salt, int iterations) throws NoSuchAlgorithmException, InvalidKeyException { + return new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.getMechanismName())) + .hi(password, salt, iterations); Review comment: Agree that we should just get rid of -1. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org