dajac commented on a change in pull request #8933:
URL: https://github.com/apache/kafka/pull/8933#discussion_r456273645



##########
File path: core/src/main/scala/kafka/server/ClientQuotaManager.scala
##########
@@ -234,55 +266,85 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
   }
 
   /**
-   * Returns maximum value (produced/consume bytes or request processing time) 
that could be recorded without guaranteed throttling.
-   * Recording any larger value will always be throttled, even if no other 
values were recorded in the quota window.
-   * This is used for deciding the maximum bytes that can be fetched at once
+   * Records that a user/clientId accumulated or would like to accumulate the 
provided amount at the
+   * the specified time, returns throttle time in milliseconds. Depending on 
the {QuotaEnforcementType}
+   * used, the behavior of this method changes:
+   * - QuotaEnforcementType.Strict verifies the quota is not violated before 
accumulating the
+   *   provided value. If it is, the value is not accumulated and the throttle 
time represents
+   *   the time to wait before the quota comes back to the defined limit.
+   * - QuotaEnforcementType.PERMISSIVE verifies the quota is not violated 
after accumulating the
+   *   provided value. If it is, the value is still accumulated and the 
throttle time represents
+   *   the time to wait before the quota comes back to the defined limit.
+   *
+   * @param session The session from which the user is extracted
+   * @param clientId The client id
+   * @param value The value to accumulate
+   * @param timeMs The time at which to accumulate the value
+   * @return The throttle time in milliseconds defines as the time to wait 
until the average
+   *         rate gets back to the defined quota
    */
-  def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = {
-    if (quotasEnabled) {
-      val clientSensors = getOrCreateQuotaSensors(session, clientId)
-      Option(quotaCallback.quotaLimit(clientQuotaType, 
clientSensors.metricTags.asJava))
-        .map(_.toDouble * (config.numQuotaSamples - 1) * 
config.quotaWindowSizeSeconds)
-        .getOrElse(Double.MaxValue)
-    } else {
-      Double.MaxValue
-    }
-  }
-
   def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: 
Double, timeMs: Long): Int = {
     val clientSensors = getOrCreateQuotaSensors(session, clientId)
     try {
-      clientSensors.quotaSensor.record(value, timeMs)
+      clientSensors.quotaSensor.record(value, timeMs, quotaEnforcementType)
       0
     } catch {
       case e: QuotaViolationException =>
-        val throttleTimeMs = throttleTime(e.value, e.bound, 
windowSize(e.metric, timeMs)).toInt
+        val throttleTimeMs = throttleTime(e, timeMs).toInt
         debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). 
Delay time: ($throttleTimeMs)")
         throttleTimeMs
     }
   }
 
-  /** "Unrecord" the given value that has already been recorded for the given 
user/client by recording a negative value
-    * of the same quantity.
-    *
-    * For a throttled fetch, the broker should return an empty response and 
thus should not record the value. Ideally,
-    * we would like to compute the throttle time before actually recording the 
value, but the current Sensor code
-    * couples value recording and quota checking very tightly. As a 
workaround, we will unrecord the value for the fetch
-    * in case of throttling. Rate keeps the sum of values that fall in each 
time window, so this should bring the
-    * overall sum back to the previous value.
-    */
+  /**
+   * Records that a user/clientId changed some metric being throttled without 
checking for
+   * quota violation. The aggregate value will subsequently be used for 
throttling when the
+   * next request is processed.
+   */
+  def recordNoThrottle(session: Session, clientId: String, value: Double): 
Unit = {
+    val clientSensors = getOrCreateQuotaSensors(session, clientId)
+    clientSensors.quotaSensor.record(value, time.milliseconds(), 
QuotaEnforcementType.NONE)
+  }
+
+  /**
+   * "Unrecord" the given value that has already been recorded for the given 
user/client by recording a negative value
+   * of the same quantity.
+   *
+   * For a throttled fetch, the broker should return an empty response and 
thus should not record the value. Ideally,
+   * we would like to compute the throttle time before actually recording the 
value, but the current Sensor code
+   * couples value recording and quota checking very tightly. As a workaround, 
we will unrecord the value for the fetch
+   * in case of throttling. Rate keeps the sum of values that fall in each 
time window, so this should bring the
+   * overall sum back to the previous value.
+   */
   def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, 
timeMs: Long): Unit = {
     val clientSensors = getOrCreateQuotaSensors(request.session, 
request.header.clientId)
-    clientSensors.quotaSensor.record(value * (-1), timeMs, false)
+    clientSensors.quotaSensor.record(value * (-1), timeMs, 
QuotaEnforcementType.NONE)
   }
 
   /**
-    * Throttle a client by muting the associated channel for the given 
throttle time.
-    * @param request client request
-    * @param throttleTimeMs Duration in milliseconds for which the channel is 
to be muted.
-    * @param channelThrottlingCallback Callback for channel throttling
-    * @return ThrottledChannel object
-    */
+   * Returns maximum value that could be recorded without guaranteed 
throttling.
+   * Recording any larger value will always be throttled, even if no other 
values were recorded in the quota window.
+   * This is used for deciding the maximum bytes that can be fetched at once
+   */
+  def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = {
+    if (quotasEnabled) {
+      val clientSensors = getOrCreateQuotaSensors(session, clientId)
+      Option(quotaCallback.quotaLimit(clientQuotaType, 
clientSensors.metricTags.asJava))
+        .map(_.toDouble * (config.numQuotaSamples - 1) * 
config.quotaWindowSizeSeconds)
+        .getOrElse(Double.MaxValue)
+    } else {
+      Double.MaxValue
+    }
+  }
+
+  /**
+   * Throttle a client by muting the associated channel for the given throttle 
time.
+   *
+   * @param request client request
+   * @param throttleTimeMs Duration in milliseconds for which the channel is 
to be muted.
+   * @param channelThrottlingCallback Callback for channel throttling
+   * @return ThrottledChannel object

Review comment:
       Indeed, good catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to