dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r464241946



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -34,6 +34,8 @@
     private final boolean expectResponse;
     private final int requestTimeoutMs;
     private final RequestCompletionHandler callback;
+    private final String initialPrincipalName;
+    private final String initialClientId;

Review comment:
       Could we use Optional for these two as they are not always provided?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+    public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Map<ConfigResource, 
Collection<AlterConfigOp>> configs,
+                                                                               
     final boolean validateOnly) {
+        return generateIncrementalRequestData(configs.keySet(), configs, 
validateOnly);
+    }
+
+    public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Collection<ConfigResource> resources,
+                                                                               
     final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                               
     final boolean validateOnly) {
+        IncrementalAlterConfigsRequestData data = new 
IncrementalAlterConfigsRequestData()
+                                                      
.setValidateOnly(validateOnly);
+        for (ConfigResource resource : resources) {
+            IncrementalAlterConfigsRequestData.AlterableConfigCollection 
alterableConfigSet =
+                new 
IncrementalAlterConfigsRequestData.AlterableConfigCollection();
+            for (AlterConfigOp configEntry : configs.get(resource))
+                alterableConfigSet.add(new 
IncrementalAlterConfigsRequestData.AlterableConfig()
+                                           
.setName(configEntry.configEntry().name())
+                                           
.setValue(configEntry.configEntry().value())
+                                           
.setConfigOperation(configEntry.opType().id()));
+            IncrementalAlterConfigsRequestData.AlterConfigsResource 
alterConfigsResource = new 
IncrementalAlterConfigsRequestData.AlterConfigsResource();
+            alterConfigsResource.setResourceType(resource.type().id())
+                
.setResourceName(resource.name()).setConfigs(alterableConfigSet);
+            data.resources().add(alterConfigsResource);
+

Review comment:
       nit: empty line could be removed.

##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,6 +37,12 @@
     // Since the client is sending the ApiVersionsRequest in order to discover 
what
     // versions are supported, the client does not know the best version to 
use.
     { "name": "ClientId", "type": "string", "versions": "1+", 
"nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." }
+      "flexibleVersions": "none", "about": "The client ID string." },
+    { "name": "InitialPrincipalName", "type": "string", "tag": 0, 
"taggedVersions": "2+",
+      "nullableVersions": "2+", "default": "null", "ignorable": true,
+      "about": "Optional value of the initial principal name when the request 
is redirected by a broker, for audit logging purpose." },

Review comment:
       Actually, we will also use it for quota. I think that we could say that 
both `InitialPrincipalName` and `InitialClientId` will be used for logging and 
quota purposes.

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -145,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
                                        request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-                                       handler: RequestCompletionHandler)
+                                       handler: RequestCompletionHandler,
+                                       initialPrincipalName: String = null,
+                                       initialClientId: String = null)

Review comment:
       Shall we use Option here?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()

Review comment:
       It looks like that we will propagate the `NOT_CONTROLLER` error back to 
the client. Is it intentional? As clients don't send this request to the 
controller (and new ones won't get the controller id anymore), it sounds weird 
to return them this error. We could perhaps return another generic error.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, 
incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( 
authorizedResources.map {
+          case (resource, ops) => resource -> ops.asJavaCollection
+        }.asJava, incrementalAlterConfigsRequest.data().validateOnly()))
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),

Review comment:
       Have we considered using Scala functions as callbacks? It would be more 
aligned with the other callbacks that we have in Scala and also would avoid 
having to define classes for each handler that support forwarding. What do you 
think?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -51,7 +55,9 @@ public ClientRequest(String destination,
                          long createdTimeMs,
                          boolean expectResponse,
                          int requestTimeoutMs,
-                         RequestCompletionHandler callback) {
+                         RequestCompletionHandler callback,

Review comment:
       nit: I would actually keep the callback as the last argument as it is a 
bit more natural to have the callback last.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String 
clientId, int correlationId) {
-        this(new RequestHeaderData().
-                setRequestApiKey(requestApiKey.id).
-                setRequestApiVersion(requestVersion).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+        this(requestApiKey, requestVersion, clientId, correlationId, null, 
null);
+    }
+
+    public RequestHeader(ApiKeys requestApiKey,
+                         short requestVersion,
+                         String clientId,
+                         int correlationId,
+                         String initialPrincipalName,
+                         String initialClientId) {

Review comment:
       Shall we use Optional here as well?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##########
@@ -47,13 +48,15 @@ public RequestContext(RequestHeader header,
                           KafkaPrincipal principal,
                           ListenerName listenerName,
                           SecurityProtocol securityProtocol,
-                          ClientInformation clientInformation) {
+                          ClientInformation clientInformation,
+                          boolean fromControlPlane) {
         this.header = header;
         this.connectionId = connectionId;
         this.clientAddress = clientAddress;
         this.principal = principal;
         this.listenerName = listenerName;
         this.securityProtocol = securityProtocol;
+        this.fromControlPlane = fromControlPlane;

Review comment:
       nit: Could we move it after `clientInformation` to keep the order inline 
with the order in the constructor?

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -100,7 +100,9 @@ object ApiVersion {
     // Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
     KAFKA_2_6_IV0,
     // Introduced feature versioning support (KIP-584)
-    KAFKA_2_7_IV0
+    KAFKA_2_7_IV0,
+    // Introduced redirection support (KIP-590)
+    KAFKA_2_7_IV1

Review comment:
       As 2.7 has not be release yet, we don't need to introduce a new version. 
We can reuse `KAFKA_2_7_IV0`.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##########
@@ -71,12 +71,12 @@ public Builder(Map<ConfigResource, Config> configs, boolean 
validateOnly) {
             Objects.requireNonNull(configs, "configs");
             for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) 
{
                 AlterConfigsRequestData.AlterConfigsResource resource = new 
AlterConfigsRequestData.AlterConfigsResource()
-                        .setResourceName(entry.getKey().name())
-                        .setResourceType(entry.getKey().type().id());
+                                                                            
.setResourceName(entry.getKey().name())
+                                                                            
.setResourceType(entry.getKey().type().id());

Review comment:
       I personally prefer the previous indentation which is, I believe, more 
common in our code base. Or do we plan to adopt a new formatting?

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -309,7 +310,10 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: 
Time) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int,
+                     val metricNamePrefix : String,

Review comment:
       nit: That was already present before your change but could we remove the 
extra space before the colon?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, 
incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( 
authorizedResources.map {

Review comment:
       nit: Remove extra space before `authorizedResources`.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -458,7 +459,7 @@ class AclAuthorizer extends Authorizer with Logging {
       val apiKey = if (ApiKeys.hasId(requestContext.requestType)) 
ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
       val refCount = action.resourceReferenceCount
 
-      s"Principal = $principal is $authResult Operation = $operation from host 
= $host on resource = $resource for request = $apiKey with resourceRefCount = 
$refCount"
+      s"[Principal = $principal, Initial Principal Name = 
$initialPrincipalName]: is $authResult Operation = $operation from host = $host 
on resource = $resource for request = $apiKey with resourceRefCount = $refCount"

Review comment:
       The usage of the square brackets and the colon looks weird here. The 
audit log does not look like a sentence anymore. I wonder if we could go with 
something like this instead: `Principal = A on behalf of Principal = B is 
allowed...`. We could also put the initial principal name only if it is set.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {

Review comment:
       For my understanding, I suppose that we don't verify that redirection is 
enabled here to ensure that the controller can accept forwarded requests as 
soon as one broker in the cluster is configured with IBP 2.7. Am I getting this 
right?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, 
incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( 
authorizedResources.map {
+          case (resource, ops) => resource -> ops.asJavaCollection
+        }.asJava, incrementalAlterConfigsRequest.data().validateOnly()))
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request even if we 
are not the controller,
+      // as admin client doesn't know how to find the controller.

Review comment:
       nit: `as admin client doesn't know how to find the controller` is not 
relevant anymore. What about the following: `When IBP is smaller than XYZ, 
forwarding is not supported therefore requests are handled directly`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2982,12 +3091,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, 
PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, 
refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == 
AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, 
authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>
+            requestContext.fromControlPlane &&

Review comment:
       * I presume that this does not work if we use the same listener for 
bother the control plane and the data plane.
   * I also wonder if it is a good thing to have this extension here as it 
applies to all the authorization in the Api Layer. I think that we should be 
cautious and only do this for forwarded requests.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = 
alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
+    val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+    val (authorizedResources, unauthorizedResources) = 
requestResources.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and 
does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
+          throw new InvalidRequestException(
+            s"AlterConfigs is deprecated and does not support the resource 
type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
           authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
       }
     }
-    val authorizedResult = adminManager.alterConfigs(authorizedResources, 
alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AlterConfigsResponse(results.asJava, requestThrottleMs))
     }
-    def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-      val data = new AlterConfigsResponseData()
-        .setThrottleTimeMs(requestThrottleMs)
-      (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) 
=>
-        data.responses().add(new AlterConfigsResourceResponse()
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-          .setResourceName(resource.name)
-          .setResourceType(resource.`type`.id))
+
+    def notControllerResponse(): Unit = {
+      val errorResult = requestResources.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.alterConfigs(
+          authorizedResources, alterConfigsRequest.validateOnly)
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new AlterConfigsRequest.Builder(
+        authorizedResources.asJava, alterConfigsRequest.validateOnly())
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request, as admin 
client doesn't know
+      // how to find the controller.
+      val authorizedResult = adminManager.alterConfigs(
+        authorizedResources, alterConfigsRequest.validateOnly)
+      val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+        resource -> configsAuthorizationApiError(resource)
       }
-      new AlterConfigsResponse(data)
+
+      sendResponseCallback(authorizedResult ++ unauthorizedResult)
+    }
+  }
+
+  private def isForwardingRequest(request: RequestChannel.Request): Boolean = {
+    request.header.initialPrincipalName != null &&
+      request.header.initialClientId != null &&
+      request.context.fromControlPlane

Review comment:
       I presume that this does not work if the broker uses the same listener 
for the control plane and the data plane.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to