dajac commented on a change in pull request #8933: URL: https://github.com/apache/kafka/pull/8933#discussion_r446176850
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1684,25 +1685,29 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = { + // Since version 6 of the API, the quota is strictly enforced. Any topic creation + // above the quota is not allowed and rejected with a THROTTLING_QUOTA_EXCEEDED error. + val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, 6) Review comment: I have introduced a new construct here in order to make the integration as lightweight as possible: `ControllerMutationQuota`. It is a small wrapper around the `Sensor` of a given user/clientId which wraps the quota enforcement and the computation of the throttle time. Three implementations are available: `StrictControllerMutationQuota` that is used for new clients, `PermissiveControllerMutationQuota` that is used for old clients, and `UnboundedControllerMutationQuota` that is used when no quota is set. The `ControllerMutationQuota` object is then passed to the `AdminManager` to enforce the quota and later used to compute the final throttle time before sending the response. At this point, the request quota is also computed and the highest one is used. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org