dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465273597



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = 
alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
+    val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+    val (authorizedResources, unauthorizedResources) = 
requestResources.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and 
does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
+          throw new InvalidRequestException(
+            s"AlterConfigs is deprecated and does not support the resource 
type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
           authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
       }
     }
-    val authorizedResult = adminManager.alterConfigs(authorizedResources, 
alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AlterConfigsResponse(results.asJava, requestThrottleMs))
     }
-    def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-      val data = new AlterConfigsResponseData()
-        .setThrottleTimeMs(requestThrottleMs)
-      (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) 
=>
-        data.responses().add(new AlterConfigsResourceResponse()
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-          .setResourceName(resource.name)
-          .setResourceType(resource.`type`.id))
+
+    def notControllerResponse(): Unit = {
+      val errorResult = requestResources.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.alterConfigs(
+          authorizedResources, alterConfigsRequest.validateOnly)
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new AlterConfigsRequest.Builder(
+        authorizedResources.asJava, alterConfigsRequest.validateOnly())
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request, as admin 
client doesn't know
+      // how to find the controller.
+      val authorizedResult = adminManager.alterConfigs(
+        authorizedResources, alterConfigsRequest.validateOnly)
+      val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+        resource -> configsAuthorizationApiError(resource)
       }
-      new AlterConfigsResponse(data)
+
+      sendResponseCallback(authorizedResult ++ unauthorizedResult)
+    }
+  }
+
+  private def isForwardingRequest(request: RequestChannel.Request): Boolean = {
+    request.header.initialPrincipalName != null &&
+      request.header.initialClientId != null &&
+      request.context.fromControlPlane

Review comment:
       Sorry, I was not clear. If the control plane listener is not configured, 
control requests will go to the data plane listener. Based on your last 
commits, it seems that you have figured that out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to