abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r509670268
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { + if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) + } else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) + ) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception()) + } else if (request.envelopeContext.isDefined && !controller.isActive) { + sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception()) + } else if (!controller.isActive && couldDoRedirection(request)) { + redirectionManager.forwardRequest(sendResponseMaybeThrottle, request) + } else { + // When IBP is smaller than 2.8 or the principal serde is undefined, forwarding is not supported, + // therefore requests are handled directly. + handler(request) + } + } + + private def couldDoRedirection(request: RequestChannel.Request): Boolean = Review comment: Sounds good. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org