dajac commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r465273597
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = { val alterConfigsRequest = request.body[AlterConfigsRequest] - val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) => + val requestResources = alterConfigsRequest.configs.asScala.toMap + + val (authorizedResources, unauthorizedResources) = requestResources.partition { case (resource, _) => resource.`type` match { case ConfigResource.Type.BROKER_LOGGER => - throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") + throw new InvalidRequestException( + s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") case ConfigResource.Type.BROKER => authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") } } - val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + + def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + new AlterConfigsResponse(results.asJava, requestThrottleMs)) } - def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = { - val data = new AlterConfigsResponseData() - .setThrottleTimeMs(requestThrottleMs) - (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) => - data.responses().add(new AlterConfigsResourceResponse() - .setErrorCode(error.error.code) - .setErrorMessage(error.message) - .setResourceName(resource.name) - .setResourceType(resource.`type`.id)) + + def notControllerResponse(): Unit = { + val errorResult = requestResources.keys.map { + resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null) + }.toMap + + sendResponseCallback(errorResult) + } + + if (isForwardingRequest(request)) { + if (!controller.isActive) { + notControllerResponse() + } else { + val authorizedResult = adminManager.alterConfigs( + authorizedResources, alterConfigsRequest.validateOnly) + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + } + + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } else if (!controller.isActive && config.redirectionEnabled) { + val redirectRequestBuilder = new AlterConfigsRequest.Builder( + authorizedResources.asJava, alterConfigsRequest.validateOnly()) + + brokerToControllerChannelManager.sendRequest(redirectRequestBuilder, + new ForwardedAlterConfigsRequestCompletionHandler(request, + unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) + }.toMap), + request.header.initialPrincipalName, + request.header.initialClientId) + } else { + // When IBP is low, we would just handle the config request, as admin client doesn't know + // how to find the controller. + val authorizedResult = adminManager.alterConfigs( + authorizedResources, alterConfigsRequest.validateOnly) + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(resource) } - new AlterConfigsResponse(data) + + sendResponseCallback(authorizedResult ++ unauthorizedResult) + } + } + + private def isForwardingRequest(request: RequestChannel.Request): Boolean = { + request.header.initialPrincipalName != null && + request.header.initialClientId != null && + request.context.fromControlPlane Review comment: Sorry, I was not clear. If the control plane listener is not configured, control requests will go to the data plane listener. Based on your last commits, it seems that you have figured that out. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org