dajac commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r464241946
########## File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ########## @@ -34,6 +34,8 @@ private final boolean expectResponse; private final int requestTimeoutMs; private final RequestCompletionHandler callback; + private final String initialPrincipalName; + private final String initialClientId; Review comment: Could we use Optional for these two as they are not always provided? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; + +import java.util.Collection; +import java.util.Map; + +public class AlterConfigsUtil { + + public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map<ConfigResource, Collection<AlterConfigOp>> configs, + final boolean validateOnly) { + return generateIncrementalRequestData(configs.keySet(), configs, validateOnly); + } + + public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection<ConfigResource> resources, + final Map<ConfigResource, Collection<AlterConfigOp>> configs, + final boolean validateOnly) { + IncrementalAlterConfigsRequestData data = new IncrementalAlterConfigsRequestData() + .setValidateOnly(validateOnly); + for (ConfigResource resource : resources) { + IncrementalAlterConfigsRequestData.AlterableConfigCollection alterableConfigSet = + new IncrementalAlterConfigsRequestData.AlterableConfigCollection(); + for (AlterConfigOp configEntry : configs.get(resource)) + alterableConfigSet.add(new IncrementalAlterConfigsRequestData.AlterableConfig() + .setName(configEntry.configEntry().name()) + .setValue(configEntry.configEntry().value()) + .setConfigOperation(configEntry.opType().id())); + IncrementalAlterConfigsRequestData.AlterConfigsResource alterConfigsResource = new IncrementalAlterConfigsRequestData.AlterConfigsResource(); + alterConfigsResource.setResourceType(resource.type().id()) + .setResourceName(resource.name()).setConfigs(alterableConfigSet); + data.resources().add(alterConfigsResource); + Review comment: nit: empty line could be removed. ########## File path: clients/src/main/resources/common/message/RequestHeader.json ########## @@ -37,6 +37,12 @@ // Since the client is sending the ApiVersionsRequest in order to discover what // versions are supported, the client does not know the best version to use. { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true, - "flexibleVersions": "none", "about": "The client ID string." } + "flexibleVersions": "none", "about": "The client ID string." }, + { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+", + "nullableVersions": "2+", "default": "null", "ignorable": true, + "about": "Optional value of the initial principal name when the request is redirected by a broker, for audit logging purpose." }, Review comment: Actually, we will also use it for quota. I think that we could say that both `InitialPrincipalName` and `InitialClientId` will be used for logging and quota purposes. ########## File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala ########## @@ -145,7 +147,9 @@ abstract class InterBrokerSendThread(name: String, case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], - handler: RequestCompletionHandler) + handler: RequestCompletionHandler, + initialPrincipalName: String = null, + initialClientId: String = null) Review comment: Shall we use Option here? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() Review comment: It looks like that we will propagate the `NOT_CONTROLLER` error back to the client. Is it intentional? As clients don't send this request to the controller (and new ones won't get the controller id anymore), it sounds weird to return them this error. We could perhaps return another generic error. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() + } else { + val authorizedResult = adminManager.incrementalAlterConfigs( + authorizedResources, incrementalAlterConfigsRequest.data.validateOnly) + + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + } + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } else if (!controller.isActive && config.redirectionEnabled) { + val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder( + AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map { + case (resource, ops) => resource -> ops.asJavaCollection + }.asJava, incrementalAlterConfigsRequest.data().validateOnly())) + + brokerToControllerChannelManager.sendRequest(redirectRequestBuilder, + new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request, + unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) + }.toMap), Review comment: Have we considered using Scala functions as callbacks? It would be more aligned with the other callbacks that we have in Scala and also would avoid having to define classes for each handler that support forwarding. What do you think? ########## File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ########## @@ -51,7 +55,9 @@ public ClientRequest(String destination, long createdTimeMs, boolean expectResponse, int requestTimeoutMs, - RequestCompletionHandler callback) { + RequestCompletionHandler callback, Review comment: nit: I would actually keep the callback as the last argument as it is a bit more natural to have the callback last. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ########## @@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) { } public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) { - this(new RequestHeaderData(). - setRequestApiKey(requestApiKey.id). - setRequestApiVersion(requestVersion). - setClientId(clientId). - setCorrelationId(correlationId), + this(requestApiKey, requestVersion, clientId, correlationId, null, null); + } + + public RequestHeader(ApiKeys requestApiKey, + short requestVersion, + String clientId, + int correlationId, + String initialPrincipalName, + String initialClientId) { Review comment: Shall we use Optional here as well? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java ########## @@ -47,13 +48,15 @@ public RequestContext(RequestHeader header, KafkaPrincipal principal, ListenerName listenerName, SecurityProtocol securityProtocol, - ClientInformation clientInformation) { + ClientInformation clientInformation, + boolean fromControlPlane) { this.header = header; this.connectionId = connectionId; this.clientAddress = clientAddress; this.principal = principal; this.listenerName = listenerName; this.securityProtocol = securityProtocol; + this.fromControlPlane = fromControlPlane; Review comment: nit: Could we move it after `clientInformation` to keep the order inline with the order in the constructor? ########## File path: core/src/main/scala/kafka/api/ApiVersion.scala ########## @@ -100,7 +100,9 @@ object ApiVersion { // Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) KAFKA_2_6_IV0, // Introduced feature versioning support (KIP-584) - KAFKA_2_7_IV0 + KAFKA_2_7_IV0, + // Introduced redirection support (KIP-590) + KAFKA_2_7_IV1 Review comment: As 2.7 has not be release yet, we don't need to introduce a new version. We can reuse `KAFKA_2_7_IV0`. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java ########## @@ -71,12 +71,12 @@ public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) { Objects.requireNonNull(configs, "configs"); for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) { AlterConfigsRequestData.AlterConfigsResource resource = new AlterConfigsRequestData.AlterConfigsResource() - .setResourceName(entry.getKey().name()) - .setResourceType(entry.getKey().type().id()); + .setResourceName(entry.getKey().name()) + .setResourceType(entry.getKey().type().id()); Review comment: I personally prefer the previous indentation which is, I believe, more common in our code base. Or do we plan to adopt a new formatting? ########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -309,7 +310,10 @@ object RequestChannel extends Logging { } } -class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup { +class RequestChannel(val queueSize: Int, + val metricNamePrefix : String, Review comment: nit: That was already present before your change but could we remove the extra space before the colon? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() + } else { + val authorizedResult = adminManager.incrementalAlterConfigs( + authorizedResources, incrementalAlterConfigsRequest.data.validateOnly) + + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + } + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } else if (!controller.isActive && config.redirectionEnabled) { + val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder( + AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map { Review comment: nit: Remove extra space before `authorizedResources`. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -458,7 +459,7 @@ class AclAuthorizer extends Authorizer with Logging { val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType val refCount = action.resourceReferenceCount - s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount" + s"[Principal = $principal, Initial Principal Name = $initialPrincipalName]: is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount" Review comment: The usage of the square brackets and the colon looks weird here. The audit log does not look like a sentence anymore. I wonder if we could go with something like this instead: `Principal = A on behalf of Principal = B is allowed...`. We could also put the initial principal name only if it is set. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { Review comment: For my understanding, I suppose that we don't verify that redirection is enabled here to ensure that the controller can accept forwarded requests as soon as one broker in the cluster is configured with IBP 2.7. Am I getting this right? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() + } else { + val authorizedResult = adminManager.incrementalAlterConfigs( + authorizedResources, incrementalAlterConfigsRequest.data.validateOnly) + + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + } + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } else if (!controller.isActive && config.redirectionEnabled) { + val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder( + AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map { + case (resource, ops) => resource -> ops.asJavaCollection + }.asJava, incrementalAlterConfigsRequest.data().validateOnly())) + + brokerToControllerChannelManager.sendRequest(redirectRequestBuilder, + new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request, + unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) + }.toMap), + request.header.initialPrincipalName, + request.header.initialClientId) + } else { + // When IBP is low, we would just handle the config request even if we are not the controller, + // as admin client doesn't know how to find the controller. Review comment: nit: `as admin client doesn't know how to find the controller` is not relevant anymore. What about the following: `When IBP is smaller than XYZ, forwarding is not supported therefore requests are handled directly`? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2982,12 +3091,33 @@ class KafkaApis(val requestChannel: RequestChannel, logIfDenied: Boolean = true, refCount: Int = 1): Boolean = { authorizer.forall { authZ => - val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) - val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied)) - authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED + if (authorizeAction(requestContext, operation, + resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) { + true + } else { + operation match { + case ALTER | ALTER_CONFIGS | CREATE | DELETE => + requestContext.fromControlPlane && Review comment: * I presume that this does not work if we use the same listener for bother the control plane and the data plane. * I also wonder if it is a good thing to have this extension here as it applies to all the authorization in the Api Layer. I think that we should be cautious and only do this for forwarded requests. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = { val alterConfigsRequest = request.body[AlterConfigsRequest] - val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) => + val requestResources = alterConfigsRequest.configs.asScala.toMap + + val (authorizedResources, unauthorizedResources) = requestResources.partition { case (resource, _) => resource.`type` match { case ConfigResource.Type.BROKER_LOGGER => - throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") + throw new InvalidRequestException( + s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") case ConfigResource.Type.BROKER => authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") } } - val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new AlterConfigsResponse(results.asJava, requestThrottleMs)) } - def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = { - val data = new AlterConfigsResponseData() - .setThrottleTimeMs(requestThrottleMs) - (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) => - data.responses().add(new AlterConfigsResourceResponse() - .setErrorCode(error.error.code) - .setErrorMessage(error.message) - .setResourceName(resource.name) - .setResourceType(resource.`type`.id)) + + def notControllerResponse(): Unit = { + val errorResult = requestResources.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() + } else { + val authorizedResult = adminManager.alterConfigs( + authorizedResources, alterConfigsRequest.validateOnly) + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + } + + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } else if (!controller.isActive && config.redirectionEnabled) { + val redirectRequestBuilder = new AlterConfigsRequest.Builder( + authorizedResources.asJava, alterConfigsRequest.validateOnly()) + + brokerToControllerChannelManager.sendRequest(redirectRequestBuilder, + new ForwardedAlterConfigsRequestCompletionHandler(request, + unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) + }.toMap), + request.header.initialPrincipalName, + request.header.initialClientId) + } else { + // When IBP is low, we would just handle the config request, as admin client doesn't know + // how to find the controller. + val authorizedResult = adminManager.alterConfigs( + authorizedResources, alterConfigsRequest.validateOnly) + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) } - new AlterConfigsResponse(data) + + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } + + private def isForwardingRequest(request: RequestChannel.Request): Boolean = { + request.header.initialPrincipalName != null && + request.header.initialClientId != null && + request.context.fromControlPlane Review comment: I presume that this does not work if the broker uses the same listener for the control plane and the data plane. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org