dajac commented on a change in pull request #8933: URL: https://github.com/apache/kafka/pull/8933#discussion_r456273645
########## File path: core/src/main/scala/kafka/server/ClientQuotaManager.scala ########## @@ -234,55 +266,85 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } /** - * Returns maximum value (produced/consume bytes or request processing time) that could be recorded without guaranteed throttling. - * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. - * This is used for deciding the maximum bytes that can be fetched at once + * Records that a user/clientId accumulated or would like to accumulate the provided amount at the + * the specified time, returns throttle time in milliseconds. Depending on the {QuotaEnforcementType} + * used, the behavior of this method changes: + * - QuotaEnforcementType.Strict verifies the quota is not violated before accumulating the + * provided value. If it is, the value is not accumulated and the throttle time represents + * the time to wait before the quota comes back to the defined limit. + * - QuotaEnforcementType.PERMISSIVE verifies the quota is not violated after accumulating the + * provided value. If it is, the value is still accumulated and the throttle time represents + * the time to wait before the quota comes back to the defined limit. + * + * @param session The session from which the user is extracted + * @param clientId The client id + * @param value The value to accumulate + * @param timeMs The time at which to accumulate the value + * @return The throttle time in milliseconds defines as the time to wait until the average + * rate gets back to the defined quota */ - def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = { - if (quotasEnabled) { - val clientSensors = getOrCreateQuotaSensors(session, clientId) - Option(quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags.asJava)) - .map(_.toDouble * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds) - .getOrElse(Double.MaxValue) - } else { - Double.MaxValue - } - } - def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = { val clientSensors = getOrCreateQuotaSensors(session, clientId) try { - clientSensors.quotaSensor.record(value, timeMs) + clientSensors.quotaSensor.record(value, timeMs, quotaEnforcementType) 0 } catch { case e: QuotaViolationException => - val throttleTimeMs = throttleTime(e.value, e.bound, windowSize(e.metric, timeMs)).toInt + val throttleTimeMs = throttleTime(e, timeMs).toInt debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)") throttleTimeMs } } - /** "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value - * of the same quantity. - * - * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, - * we would like to compute the throttle time before actually recording the value, but the current Sensor code - * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch - * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the - * overall sum back to the previous value. - */ + /** + * Records that a user/clientId changed some metric being throttled without checking for + * quota violation. The aggregate value will subsequently be used for throttling when the + * next request is processed. + */ + def recordNoThrottle(session: Session, clientId: String, value: Double): Unit = { + val clientSensors = getOrCreateQuotaSensors(session, clientId) + clientSensors.quotaSensor.record(value, time.milliseconds(), QuotaEnforcementType.NONE) + } + + /** + * "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value + * of the same quantity. + * + * For a throttled fetch, the broker should return an empty response and thus should not record the value. Ideally, + * we would like to compute the throttle time before actually recording the value, but the current Sensor code + * couples value recording and quota checking very tightly. As a workaround, we will unrecord the value for the fetch + * in case of throttling. Rate keeps the sum of values that fall in each time window, so this should bring the + * overall sum back to the previous value. + */ def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, timeMs: Long): Unit = { val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId) - clientSensors.quotaSensor.record(value * (-1), timeMs, false) + clientSensors.quotaSensor.record(value * (-1), timeMs, QuotaEnforcementType.NONE) } /** - * Throttle a client by muting the associated channel for the given throttle time. - * @param request client request - * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted. - * @param channelThrottlingCallback Callback for channel throttling - * @return ThrottledChannel object - */ + * Returns maximum value that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = { + if (quotasEnabled) { + val clientSensors = getOrCreateQuotaSensors(session, clientId) + Option(quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags.asJava)) + .map(_.toDouble * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds) + .getOrElse(Double.MaxValue) + } else { + Double.MaxValue + } + } + + /** + * Throttle a client by muting the associated channel for the given throttle time. + * + * @param request client request + * @param throttleTimeMs Duration in milliseconds for which the channel is to be muted. + * @param channelThrottlingCallback Callback for channel throttling + * @return ThrottledChannel object Review comment: Indeed, good catch. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org