hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493823776



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = 
resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, 
Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not 
caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new 
ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       As discussed offline, we can pass the expected version down to the 
Builder. The abstract builder already supports an explicit range of versions. 
In any case, it doesn't seem like we have a choice.
   
   By the way, one potential edge case here is that the broker receiving the 
request has upgraded to a later version than the controller. This would be 
possible in the middle of a rolling upgrade. I don't think there's an easy way 
to handle this. We could return UNSUPPORTED_VERSION to the client, but that 
would be surprising since the client chose a supported API based on ApiVersions 
and is not aware of the controller redirection.
   
   One idea to address this problem is to gate version upgrades to redirectable 
APIs by the IBP. Basically all of these APIs have become inter-broker APIs 
through redirection so they need the safeguard of the IBP. Feels like we might 
have to do this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to