dajac commented on a change in pull request #8933:
URL: https://github.com/apache/kafka/pull/8933#discussion_r446176850



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1684,25 +1685,29 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
+    // Since version 6 of the API, the quota is strictly enforced. Any topic 
creation
+    // above the quota is not allowed and rejected with a 
THROTTLING_QUOTA_EXCEEDED error.
+    val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, 6)

Review comment:
       I have introduced a new construct here in order to make the integration 
as lightweight as possible: `ControllerMutationQuota`. It is a small wrapper 
around the `Sensor` of a given user/clientId which wraps the quota enforcement 
and the computation of the throttle time. Three implementations are available: 
`StrictControllerMutationQuota` that is used for new clients, 
`PermissiveControllerMutationQuota` that is used for old clients, and 
`UnboundedControllerMutationQuota` that is used when no quota is set.
   
   The `ControllerMutationQuota` object is then passed to the `AdminManager` to 
enforce the quota and later used to compute the final throttle time before 
sending the response. At this point, the request quota is also computed and the 
highest one is used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to