dajac commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r440028429
########## File path: clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java ########## @@ -572,16 +572,18 @@ public synchronized void removeReporter(MetricsReporter reporter) { } } - synchronized void registerMetric(KafkaMetric metric) { + synchronized void registerMetric(KafkaMetric metric, boolean report) { Review comment: Thinking a bit more about this, did you consider adding the flag to `MetricConfig`? It may be a bit simpler and cleaner as it avoids having to add the flag to all the methods. What do you think? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def waitForConnectionSlot(listenerName: ListenerName, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - if (!connectionSlotAvailable(listenerName)) { + val startTimeMs = time.milliseconds() + val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0) + + if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) { val startNs = time.nanoseconds + val endThrottleTimeMs = startTimeMs + throttleTimeMs + var remainingThrottleTimeMs = throttleTimeMs do { - counts.wait() - } while (!connectionSlotAvailable(listenerName)) + counts.wait(remainingThrottleTimeMs) Review comment: Thanks for the clarification. I am sorry but I misread the code the first time. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { + val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + + if (protectedListener(listenerName)) { + listenerThrottleTimeMs + } else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) + } + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { + try { + sensor.record(1.0, timeMs) + 0 + } catch { + case e: QuotaViolationException => + val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt + debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") + throttleTimeMs + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { + val quotaEntity = listenerOpt.getOrElse("broker") + val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) + sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false) + info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") + sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { + val metric = metrics.metric(connectionRateMetricName((listenerOpt))) + metric.config(rateQuotaMetricConfig(quotaLimit)) + info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { + val quotaEntity = listenerOpt.getOrElse("broker") + metrics.metricName( + s"connection-creation-rate-$quotaEntity", + "connection-quota-no-jmx", + s"Tracking $quotaEntity connection creation rate", + rateQuotaMetricTags(listenerOpt)) + } + + private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = { + new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quota(new Quota(quotaLimit, true)) + } + + private def rateQuotaMetricTags(listenerOpt: Option[String]): util.Map[String, String] = { + val tags = new util.LinkedHashMap[String, String] + listenerOpt.foreach(listener => tags.put("listener", listener)) + tags Review comment: What about returning `Collections.emptyMap` when `listenerOpt` is not defined and using `Collections.singletonMap` when it is? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1258,11 +1274,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def waitForConnectionSlot(listenerName: ListenerName, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - if (!connectionSlotAvailable(listenerName)) { + val startTimeMs = time.milliseconds() Review comment: nit: `()` can be omitted here. ########## File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala ########## @@ -163,13 +167,66 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") } + @Test + def testDynamicListenerConnectionCreationRateQuota(): Unit = { + // Create another listener. PLAINTEXT is an inter-broker listener + // keep default limits + val newListenerNames = Seq("PLAINTEXT", "EXTERNAL") + val newListeners = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0" + val props = new Properties + props.put(KafkaConfig.ListenersProp, newListeners) + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT") + reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners)) + waitForListener("EXTERNAL") + + // new broker-wide connection rate limit + val connRateLimit = 18 + + // before setting connection rate to 10, verify we can do at least double that by default (no limit) + verifyConnectionRate(2 * connRateLimit, Int.MaxValue, "PLAINTEXT") + + // Reduce total broker connection rate limit to 10 at the cluster level and verify the limit is enforced Review comment: `to 18`? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1258,11 +1274,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def waitForConnectionSlot(listenerName: ListenerName, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - if (!connectionSlotAvailable(listenerName)) { + val startTimeMs = time.milliseconds() Review comment: It is a bit confusing to have `startTimeMs` and `startNs` defined few lines apart. Is it worth renaming `startTimeMs` to `startThrottleTimeMs` to clearly state that this is used as part of the throttle time computing. It would also be consistent with `endThrottleTimeMs`. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { + val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + + if (protectedListener(listenerName)) { + listenerThrottleTimeMs + } else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) + } + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { + try { + sensor.record(1.0, timeMs) + 0 + } catch { + case e: QuotaViolationException => + val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt + debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") + throttleTimeMs + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { + val quotaEntity = listenerOpt.getOrElse("broker") + val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) + sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false) + info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") + sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { + val metric = metrics.metric(connectionRateMetricName((listenerOpt))) Review comment: nit: Are the parenthesis around `listenerOpt` really necessary? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1308,18 +1410,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { val value = maxConnections(configs) if (value <= 0) throw new ConfigException("Invalid max.connections $listenerMax") Review comment: Not related to your PR but it seems that `listenerMax` is never defined. I think that `value` should be used instead here. Could you fix this? Could you also use `MaxConnectionsProp` instead of `max.connections` as you did for the other already? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { + val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + + if (protectedListener(listenerName)) { + listenerThrottleTimeMs + } else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) + } + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { + try { + sensor.record(1.0, timeMs) + 0 + } catch { + case e: QuotaViolationException => + val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt + debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") + throttleTimeMs + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { + val quotaEntity = listenerOpt.getOrElse("broker") + val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) + sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false) + info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") + sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { + val metric = metrics.metric(connectionRateMetricName((listenerOpt))) + metric.config(rateQuotaMetricConfig(quotaLimit)) + info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { + val quotaEntity = listenerOpt.getOrElse("broker") + metrics.metricName( + s"connection-creation-rate-$quotaEntity", + "connection-quota-no-jmx", Review comment: nit: I would rename this one now. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { + val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + + if (protectedListener(listenerName)) { + listenerThrottleTimeMs + } else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) + } + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { + try { + sensor.record(1.0, timeMs) + 0 + } catch { + case e: QuotaViolationException => + val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt + debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") + throttleTimeMs + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { + val quotaEntity = listenerOpt.getOrElse("broker") + val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) + sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false) + info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") + sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { + val metric = metrics.metric(connectionRateMetricName((listenerOpt))) + metric.config(rateQuotaMetricConfig(quotaLimit)) + info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { + val quotaEntity = listenerOpt.getOrElse("broker") Review comment: nit: As the `quotaEntity` is also computed by the caller methods, would it make sense to pass it as an argument to `connectionRateMetricName`? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { + val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + + if (protectedListener(listenerName)) { + listenerThrottleTimeMs + } else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) + throttleTimeMs + } + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { + try { + sensor.record(1.0, timeMs) + 0 + } catch { + case e: QuotaViolationException => + val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt + debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") + throttleTimeMs + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { + val quotaEntity = listenerOpt.getOrElse("broker") + val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) + sensor.add(connectionRateMetricName(listenerOpt), new Rate) + info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") + sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { + val metric = metrics.metric(connectionRateMetricName((listenerOpt))) + metric.config(rateQuotaMetricConfig(quotaLimit)) + info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { + val quotaEntity = listenerOpt.getOrElse("broker") + metrics.metricName( + s"connection-creation-rate-$quotaEntity", + "connection-quota-no-jmx", + s"Tracking $quotaEntity connection creation rate", + rateQuotaMetricTags(listenerOpt)) Review comment: Ack. I did not know this. ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -329,15 +613,46 @@ class ConnectionQuotasTest { } // this method must be called on a separate thread, because connectionQuotas.inc() may block + private def acceptConnectionsAndVerifyRate(connectionQuotas: ConnectionQuotas, + listenerDesc: ListenerDesc, + numConnections: Long, + timeIntervalMs: Long, + expectedRate: Int, + epsilon: Int) : Unit = { + val startTimeMs = System.currentTimeMillis + acceptConnections(connectionQuotas, listenerDesc.listenerName, listenerDesc.defaultIp, numConnections, timeIntervalMs) + val elapsedSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis - startTimeMs) + val actualRate = (numConnections.toDouble / elapsedSeconds).toInt + if (actualRate - epsilon > expectedRate || actualRate + epsilon < expectedRate) + throw new TestFailedException( + (e: StackDepthException) => + Some(s"Expected rate $expectedRate, but got $actualRate ($numConnections connections / $elapsedSeconds sec)"), + None, + Position.here) Review comment: Can't we use `assertTrue` here to simplify? ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -302,21 +312,295 @@ class ConnectionQuotasTest { } // all connections should get added overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS)) - listeners.values.foreach { listener => - assertEquals(s"Number of connections on $listener:", - listenerMaxConnections, connectionQuotas.get(listener.defaultIp)) + verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the total rate < broker-wide quota, and verify there is no throttling + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + + // the blocked percent should still be 0, because no limits were reached + verifyNoBlockedPercentRecordedOnAllListeners() + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = { + val brokerRateLimit = 90 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) Review comment: nit: We could perhaps create a small help method like `brokerProps` that accepts a Map of customer configuration pairs and returns a `KafkaConfig`. That would reduce the boilerplate code. ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -302,21 +312,295 @@ class ConnectionQuotasTest { } // all connections should get added overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS)) - listeners.values.foreach { listener => - assertEquals(s"Number of connections on $listener:", - listenerMaxConnections, connectionQuotas.get(listener.defaultIp)) + verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the total rate < broker-wide quota, and verify there is no throttling + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + + // the blocked percent should still be 0, because no limits were reached + verifyNoBlockedPercentRecordedOnAllListeners() + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = { + val brokerRateLimit = 90 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // each listener creates connections such that the total connection rate > broker-wide quota + val connCreateIntervalMs = 10 // connection creation rate = 100 + val connectionsPerListener = 400 + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) + } + futures.foreach(_.get(20, TimeUnit.SECONDS)) + + // verify that connections on non-inter-broker listener are throttled + verifyOnlyNonInterBrokerListenersBlockedPercentRecorded() + + // expect all connections to be created (no limit on the number of connections) + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val listenerRateLimit = 50 + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava + addListenersAndVerify(config, listenerConfig, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the rate < listener quota on every listener, and verify there is no throttling + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) + } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + + // the blocked percent should still be 0, because no limits were reached + verifyNoBlockedPercentRecordedOnAllListeners() + + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = { + val brokerRateLimit = 125 + val listenerRateLimit = 30 + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava + addListenersAndVerify(config, listenerConfig, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the rate > listener quota on every listener + // run a bit longer (20 seconds) to also verify the throttle rate + val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec + val futures = listeners.values.map { listener => + executor.submit((() => + // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window + acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable) + } + futures.foreach(_.get(30, TimeUnit.SECONDS)) + + // verify that every listener was throttled + blockedPercentMeters.foreach { case (name, meter) => + assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0) + } + + // while the connection creation rate was throttled, + // expect all connections got created (not limit on the number of connections) + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testMaxListenerConnectionListenerMustBeAboveZero(): Unit = { + val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName) + + val maxListenerConnectionRate = 0 + val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> maxListenerConnectionRate.toString).asJava + assertThrows[ConfigException] { + connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig) + } + } + + @Test + def testMaxListenerConnectionRateReconfiguration(): Unit = { Review comment: I wonder if we could just check that the metric's config is correctly re-configured instead of testing the number of connections accepted. The goal of the test is not really to verify that the quota works but rather to ensure that metric is correctly re-configured. Have you considered this? The same would apply to `testMaxBrokerConnectionRateReconfiguration`. ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -302,21 +312,295 @@ class ConnectionQuotasTest { } // all connections should get added overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS)) - listeners.values.foreach { listener => - assertEquals(s"Number of connections on $listener:", - listenerMaxConnections, connectionQuotas.get(listener.defaultIp)) + verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the total rate < broker-wide quota, and verify there is no throttling + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + + // the blocked percent should still be 0, because no limits were reached + verifyNoBlockedPercentRecordedOnAllListeners() + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = { + val brokerRateLimit = 90 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) Review comment: It seems that almost all the test cases instantiate an executor with `listeners.size`. Have you considered moving this to the `setUp` method and moving the `shutdownNow` to the `tearDown`? ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -302,21 +312,295 @@ class ConnectionQuotasTest { } // all connections should get added overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS)) - listeners.values.foreach { listener => - assertEquals(s"Number of connections on $listener:", - listenerMaxConnections, connectionQuotas.get(listener.defaultIp)) + verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the total rate < broker-wide quota, and verify there is no throttling + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec Review comment: I really like the explanation next to the constants! I would recommend to group all the constants in the beginning of the test case. That would help to get a quick overview of the test case. ########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -302,21 +312,295 @@ class ConnectionQuotasTest { } // all connections should get added overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS)) - listeners.values.foreach { listener => - assertEquals(s"Number of connections on $listener:", - listenerMaxConnections, connectionQuotas.get(listener.defaultIp)) + verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the total rate < broker-wide quota, and verify there is no throttling + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + + // the blocked percent should still be 0, because no limits were reached + verifyNoBlockedPercentRecordedOnAllListeners() + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = { + val brokerRateLimit = 90 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // each listener creates connections such that the total connection rate > broker-wide quota + val connCreateIntervalMs = 10 // connection creation rate = 100 + val connectionsPerListener = 400 + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) + } + futures.foreach(_.get(20, TimeUnit.SECONDS)) + + // verify that connections on non-inter-broker listener are throttled + verifyOnlyNonInterBrokerListenersBlockedPercentRecorded() + + // expect all connections to be created (no limit on the number of connections) + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val listenerRateLimit = 50 + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava + addListenersAndVerify(config, listenerConfig, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the rate < listener quota on every listener, and verify there is no throttling + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) + } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + + // the blocked percent should still be 0, because no limits were reached + verifyNoBlockedPercentRecordedOnAllListeners() + + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = { + val brokerRateLimit = 125 + val listenerRateLimit = 30 + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava + addListenersAndVerify(config, listenerConfig, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the rate > listener quota on every listener + // run a bit longer (20 seconds) to also verify the throttle rate + val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec + val futures = listeners.values.map { listener => + executor.submit((() => + // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window + acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable) + } + futures.foreach(_.get(30, TimeUnit.SECONDS)) + + // verify that every listener was throttled + blockedPercentMeters.foreach { case (name, meter) => + assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0) + } Review comment: nit: Would it make sense to the pendant of `verifyNoBlockedPercentRecordedOnAllListeners` for this block? Something like `verifyNonZeroBlockedPercentRecordedOnAllListeners`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org