abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r464567004



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -34,6 +34,8 @@
     private final boolean expectResponse;
     private final int requestTimeoutMs;
     private final RequestCompletionHandler callback;
+    private final String initialPrincipalName;
+    private final String initialClientId;

Review comment:
       It is not necessary as we don't check nulls for these fields.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = 
alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
+    val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+    val (authorizedResources, unauthorizedResources) = 
requestResources.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and 
does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
+          throw new InvalidRequestException(
+            s"AlterConfigs is deprecated and does not support the resource 
type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
           authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
       }
     }
-    val authorizedResult = adminManager.alterConfigs(authorizedResources, 
alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AlterConfigsResponse(results.asJava, requestThrottleMs))
     }
-    def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-      val data = new AlterConfigsResponseData()
-        .setThrottleTimeMs(requestThrottleMs)
-      (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) 
=>
-        data.responses().add(new AlterConfigsResourceResponse()
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-          .setResourceName(resource.name)
-          .setResourceType(resource.`type`.id))
+
+    def notControllerResponse(): Unit = {
+      val errorResult = requestResources.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.alterConfigs(
+          authorizedResources, alterConfigsRequest.validateOnly)
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new AlterConfigsRequest.Builder(
+        authorizedResources.asJava, alterConfigsRequest.validateOnly())
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request, as admin 
client doesn't know
+      // how to find the controller.
+      val authorizedResult = adminManager.alterConfigs(
+        authorizedResources, alterConfigsRequest.validateOnly)
+      val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+        resource -> configsAuthorizationApiError(resource)
       }
-      new AlterConfigsResponse(data)
+
+      sendResponseCallback(authorizedResult ++ unauthorizedResult)
+    }
+  }
+
+  private def isForwardingRequest(request: RequestChannel.Request): Boolean = {
+    request.header.initialPrincipalName != null &&
+      request.header.initialClientId != null &&
+      request.context.fromControlPlane

Review comment:
       Will requests only flow to data plane if they use the same listener?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {

Review comment:
       Yes, the purpose is to always handle a forwarding request even if IBP is 
not 2.7 yet. This is because some brokers may already upgrade their IBP and 
they start sending forwarding requests, which is totally legitimate.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, 
incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( 
authorizedResources.map {
+          case (resource, ops) => resource -> ops.asJavaCollection
+        }.asJava, incrementalAlterConfigsRequest.data().validateOnly()))
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request even if we 
are not the controller,
+      // as admin client doesn't know how to find the controller.

Review comment:
       Sg!

##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,6 +37,12 @@
     // Since the client is sending the ApiVersionsRequest in order to discover 
what
     // versions are supported, the client does not know the best version to 
use.
     { "name": "ClientId", "type": "string", "versions": "1+", 
"nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." }
+      "flexibleVersions": "none", "about": "The client ID string." },
+    { "name": "InitialPrincipalName", "type": "string", "tag": 0, 
"taggedVersions": "2+",
+      "nullableVersions": "2+", "default": "null", "ignorable": true,
+      "about": "Optional value of the initial principal name when the request 
is redirected by a broker, for audit logging purpose." },

Review comment:
       I don't think we need initial client id for audit logging, is there some 
other logging you have in mind?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String 
clientId, int correlationId) {
-        this(new RequestHeaderData().
-                setRequestApiKey(requestApiKey.id).
-                setRequestApiVersion(requestVersion).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+        this(requestApiKey, requestVersion, clientId, correlationId, null, 
null);
+    }
+
+    public RequestHeader(ApiKeys requestApiKey,
+                         short requestVersion,
+                         String clientId,
+                         int correlationId,
+                         String initialPrincipalName,
+                         String initialClientId) {

Review comment:
       Not necessary, as explained.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()

Review comment:
       Not this is propagating to the sender broker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to