apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r453965142
########## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ########## @@ -302,21 +312,295 @@ class ConnectionQuotasTest { } // all connections should get added overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS)) - listeners.values.foreach { listener => - assertEquals(s"Number of connections on $listener:", - listenerMaxConnections, connectionQuotas.get(listener.defaultIp)) + verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = { + val brokerRateLimit = 125 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) + val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics) + + addListenersAndVerify(config, connectionQuotas) + + val executor = Executors.newFixedThreadPool(listeners.size) + try { + // create connections with the total rate < broker-wide quota, and verify there is no throttling + val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total) + val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec + val futures = listeners.values.map { listener => + executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable) } + futures.foreach(_.get(10, TimeUnit.SECONDS)) + + // the blocked percent should still be 0, because no limits were reached + verifyNoBlockedPercentRecordedOnAllListeners() + verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) + } finally { + executor.shutdownNow() + } + } + + @Test + def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = { + val brokerRateLimit = 90 + val props = brokerPropsWithDefaultConnectionLimits + props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString) + val config = KafkaConfig.fromProps(props) Review comment: the test code has just one extra prop in addition to `brokerPropsWithDefaultConnectionLimits` -- somehow it is easier to read the current way. I will keep it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org