apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r453962326



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)

Review comment:
       others use single thread, but yes, makes sense just to create one 
upfront using `listeners.size`. Will do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to