apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r453965142



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)

Review comment:
       the test code has just one extra prop in addition to 
`brokerPropsWithDefaultConnectionLimits` -- somehow it is easier to read the 
current way. I will keep it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to