dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r440028429



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
##########
@@ -572,16 +572,18 @@ public synchronized void removeReporter(MetricsReporter 
reporter) {
         }
     }
 
-    synchronized void registerMetric(KafkaMetric metric) {
+    synchronized void registerMetric(KafkaMetric metric, boolean report) {

Review comment:
       Thinking a bit more about this, did you consider adding the flag to 
`MetricConfig`? It may be a bit simpler and cleaner as it avoids having to add 
the flag to all the methods. What do you think?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: 
com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()
+      val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+      if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
         val startNs = time.nanoseconds
+        val endThrottleTimeMs = startTimeMs + throttleTimeMs
+        var remainingThrottleTimeMs = throttleTimeMs
         do {
-          counts.wait()
-        } while (!connectionSlotAvailable(listenerName))
+          counts.wait(remainingThrottleTimeMs)

Review comment:
       Thanks for the clarification. I am sorry but I misread the code the 
first time.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation 
rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): 
MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))
+  }
+
+  private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
+    new MetricConfig()
+      .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(new Quota(quotaLimit, true))
+  }
+
+  private def rateQuotaMetricTags(listenerOpt: Option[String]): 
util.Map[String, String] = {
+    val tags = new util.LinkedHashMap[String, String]
+    listenerOpt.foreach(listener => tags.put("listener", listener))
+    tags

Review comment:
       What about returning `Collections.emptyMap` when `listenerOpt` is not 
defined and using `Collections.singletonMap` when it is?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1258,11 +1274,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: 
com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()

Review comment:
       nit: `()` can be omitted here.

##########
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -163,13 +167,66 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, 
"Connections not closed")
   }
 
+  @Test
+  def testDynamicListenerConnectionCreationRateQuota(): Unit = {
+    // Create another listener. PLAINTEXT is an inter-broker listener
+    // keep default limits
+    val newListenerNames = Seq("PLAINTEXT", "EXTERNAL")
+    val newListeners = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"
+    val props = new Properties
+    props.put(KafkaConfig.ListenersProp, newListeners)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT")
+    reconfigureServers(props, perBrokerConfig = true, 
(KafkaConfig.ListenersProp, newListeners))
+    waitForListener("EXTERNAL")
+
+    // new broker-wide connection rate limit
+    val connRateLimit = 18
+
+    // before setting connection rate to 10, verify we can do at least double 
that by default (no limit)
+    verifyConnectionRate(2 * connRateLimit, Int.MaxValue, "PLAINTEXT")
+
+    // Reduce total broker connection rate limit to 10 at the cluster level 
and verify the limit is enforced

Review comment:
       `to 18`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1258,11 +1274,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: 
com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()

Review comment:
       It is a bit confusing to have `startTimeMs` and `startNs` defined few 
lines apart. Is it worth renaming `startTimeMs` to `startThrottleTimeMs` to 
clearly state that this is used as part of the throttle time computing. It 
would also be consistent with `endThrottleTimeMs`.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))

Review comment:
       nit: Are the parenthesis around `listenerOpt` really necessary?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1308,18 +1410,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
       val value = maxConnections(configs)
       if (value <= 0)
         throw new ConfigException("Invalid max.connections $listenerMax")

Review comment:
       Not related to your PR but it seems that `listenerMax` is never defined. 
I think that `value` should be used instead here. Could you fix this? Could you 
also use `MaxConnectionsProp` instead of `max.connections` as you did for the 
other already?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation 
rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): 
MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",

Review comment:
       nit: I would rename this one now.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation 
rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): 
MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")

Review comment:
       nit: As the `quotaEntity` is also computed by the caller methods, would 
it make sense to pass it as an argument to `connectionRateMetricName`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      val throttleTimeMs = math.max(brokerThrottleTimeMs, 
listenerThrottleTimeMs)
+      throttleTimeMs
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation 
rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): 
MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))

Review comment:
       Ack. I did not know this.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -329,15 +613,46 @@ class ConnectionQuotasTest {
   }
 
   // this method must be called on a separate thread, because 
connectionQuotas.inc() may block
+  private def acceptConnectionsAndVerifyRate(connectionQuotas: 
ConnectionQuotas,
+                                             listenerDesc: ListenerDesc,
+                                             numConnections: Long,
+                                             timeIntervalMs: Long,
+                                             expectedRate: Int,
+                                             epsilon: Int) : Unit = {
+    val startTimeMs = System.currentTimeMillis
+    acceptConnections(connectionQuotas, listenerDesc.listenerName, 
listenerDesc.defaultIp, numConnections, timeIntervalMs)
+    val elapsedSeconds = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis - startTimeMs)
+    val actualRate = (numConnections.toDouble / elapsedSeconds).toInt
+    if (actualRate - epsilon > expectedRate || actualRate + epsilon < 
expectedRate)
+      throw new TestFailedException(
+        (e: StackDepthException) =>
+          Some(s"Expected rate $expectedRate, but got $actualRate 
($numConnections connections / $elapsedSeconds sec)"),
+        None,
+        Position.here)

Review comment:
       Can't we use `assertTrue` here to simplify?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)

Review comment:
       nit: We could perhaps create a small help method like `brokerProps` that 
accepts a Map of customer configuration pairs and returns a `KafkaConfig`. That 
would reduce the boilerplate code.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate 
> broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of 
connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, 
and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 
connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the 
measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to 
be recorded", meter.count() > 0)
+      }
+
+      // while the connection creation rate was throttled,
+      // expect all connections got created (not limit on the number of 
connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionListenerMustBeAboveZero(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+
+    val maxListenerConnectionRate = 0
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
maxListenerConnectionRate.toString).asJava
+    assertThrows[ConfigException] {
+      
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig)
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionRateReconfiguration(): Unit = {

Review comment:
       I wonder if we could just check that the metric's config is correctly 
re-configured instead of testing the number of connections accepted. The goal 
of the test is not really to verify that the quota works but rather to ensure 
that metric is correctly re-configured. Have you considered this? The same 
would apply to `testMaxBrokerConnectionRateReconfiguration`.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)

Review comment:
       It seems that almost all the test cases instantiate an executor with 
`listeners.size`. Have you considered moving this to the `setUp` method and 
moving the `shutdownNow` to the `tearDown`?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec

Review comment:
       I really like the explanation next to the constants! I would recommend 
to group all the constants in the beginning of the test case. That would help 
to get a quick overview of the test case.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate 
> broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of 
connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, 
and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 
connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the 
measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to 
be recorded", meter.count() > 0)
+      }

Review comment:
       nit: Would it make sense to the pendant of 
`verifyNoBlockedPercentRecordedOnAllListeners` for this block? Something like 
`verifyNonZeroBlockedPercentRecordedOnAllListeners`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to