apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r439786379
########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { + val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + + if (protectedListener(listenerName)) { + listenerThrottleTimeMs + } else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) + throttleTimeMs + } + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { + try { + sensor.record(1.0, timeMs) + 0 + } catch { + case e: QuotaViolationException => + val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt + debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") + throttleTimeMs + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { + val quotaEntity = listenerOpt.getOrElse("broker") + val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) + sensor.add(connectionRateMetricName(listenerOpt), new Rate) + info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") + sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { + val metric = metrics.metric(connectionRateMetricName((listenerOpt))) + metric.config(rateQuotaMetricConfig(quotaLimit)) + info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { + val quotaEntity = listenerOpt.getOrElse("broker") + metrics.metricName( + s"connection-creation-rate-$quotaEntity", + "connection-quota-no-jmx", + s"Tracking $quotaEntity connection creation rate", + rateQuotaMetricTags(listenerOpt)) Review comment: We already have a yammer Meter that reports connection creation rate, also tagged with listener and processor. This same meter tracks both connection creation rate and total. Here is connetion creation rate metric: kafka.server:type=socket-server-metrics,listener={listener_name},networkProcessor={#},name=connection-creation-rate I had to add a Rate metric here because it both tracks rate and quota, and allows to calculate throttle time the same way we do with client (request & bandwidth) quotas. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org