abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r464567004
########## File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ########## @@ -34,6 +34,8 @@ private final boolean expectResponse; private final int requestTimeoutMs; private final RequestCompletionHandler callback; + private final String initialPrincipalName; + private final String initialClientId; Review comment: It is not necessary as we don't check nulls for these fields. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = { val alterConfigsRequest = request.body[AlterConfigsRequest] - val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) => + val requestResources = alterConfigsRequest.configs.asScala.toMap + + val (authorizedResources, unauthorizedResources) = requestResources.partition { case (resource, _) => resource.`type` match { case ConfigResource.Type.BROKER_LOGGER => - throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") + throw new InvalidRequestException( + s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") case ConfigResource.Type.BROKER => authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") } } - val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new AlterConfigsResponse(results.asJava, requestThrottleMs)) } - def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = { - val data = new AlterConfigsResponseData() - .setThrottleTimeMs(requestThrottleMs) - (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) => - data.responses().add(new AlterConfigsResourceResponse() - .setErrorCode(error.error.code) - .setErrorMessage(error.message) - .setResourceName(resource.name) - .setResourceType(resource.`type`.id)) + + def notControllerResponse(): Unit = { + val errorResult = requestResources.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() + } else { + val authorizedResult = adminManager.alterConfigs( + authorizedResources, alterConfigsRequest.validateOnly) + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + } + + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } else if (!controller.isActive && config.redirectionEnabled) { + val redirectRequestBuilder = new AlterConfigsRequest.Builder( + authorizedResources.asJava, alterConfigsRequest.validateOnly()) + + brokerToControllerChannelManager.sendRequest(redirectRequestBuilder, + new ForwardedAlterConfigsRequestCompletionHandler(request, + unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) + }.toMap), + request.header.initialPrincipalName, + request.header.initialClientId) + } else { + // When IBP is low, we would just handle the config request, as admin client doesn't know + // how to find the controller. + val authorizedResult = adminManager.alterConfigs( + authorizedResources, alterConfigsRequest.validateOnly) + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) } - new AlterConfigsResponse(data) + + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } + + private def isForwardingRequest(request: RequestChannel.Request): Boolean = { + request.header.initialPrincipalName != null && + request.header.initialClientId != null && + request.context.fromControlPlane Review comment: Will requests only flow to data plane if they use the same listener? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { Review comment: Yes, the purpose is to always handle a forwarding request even if IBP is not 2.7 yet. This is because some brokers may already upgrade their IBP and they start sending forwarding requests, which is totally legitimate. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() + } else { + val authorizedResult = adminManager.incrementalAlterConfigs( + authorizedResources, incrementalAlterConfigsRequest.data.validateOnly) + + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + } + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } else if (!controller.isActive && config.redirectionEnabled) { + val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder( + AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map { + case (resource, ops) => resource -> ops.asJavaCollection + }.asJava, incrementalAlterConfigsRequest.data().validateOnly())) + + brokerToControllerChannelManager.sendRequest(redirectRequestBuilder, + new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request, + unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) + }.toMap), + request.header.initialPrincipalName, + request.header.initialClientId) + } else { + // When IBP is low, we would just handle the config request even if we are not the controller, + // as admin client doesn't know how to find the controller. Review comment: Sg! ########## File path: clients/src/main/resources/common/message/RequestHeader.json ########## @@ -37,6 +37,12 @@ // Since the client is sending the ApiVersionsRequest in order to discover what // versions are supported, the client does not know the best version to use. { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true, - "flexibleVersions": "none", "about": "The client ID string." } + "flexibleVersions": "none", "about": "The client ID string." }, + { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+", + "nullableVersions": "2+", "default": "null", "ignorable": true, + "about": "Optional value of the initial principal name when the request is redirected by a broker, for audit logging purpose." }, Review comment: I don't think we need initial client id for audit logging, is there some other logging you have in mind? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ########## @@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) { } public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) { - this(new RequestHeaderData(). - setRequestApiKey(requestApiKey.id). - setRequestApiVersion(requestVersion). - setClientId(clientId). - setCorrelationId(correlationId), + this(requestApiKey, requestVersion, clientId, correlationId, null, null); + } + + public RequestHeader(ApiKeys requestApiKey, + short requestVersion, + String clientId, + int correlationId, + String initialPrincipalName, + String initialClientId) { Review comment: Not necessary, as explained. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava)) + } + + def notControllerResponse(): Unit = { + val errorResult = configs.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() Review comment: Not this is propagating to the sender broker. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org