apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r439143850



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: 
com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()
+      val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+      if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
         val startNs = time.nanoseconds
+        val endThrottleTimeMs = startTimeMs + throttleTimeMs
+        var remainingThrottleTimeMs = throttleTimeMs
         do {
-          counts.wait()
-        } while (!connectionSlotAvailable(listenerName))
+          counts.wait(remainingThrottleTimeMs)

Review comment:
       This is exactly the behavior proposed in KIP -- if we reach any limit 
(number of connections or connection rate), we need to wait. So, if there is no 
space for a new connection, and the delay due to rate limit has passed, we 
would have to wait for a connection slot. However, remember that if we are 
waiting on an inter-broker connection slot, the broker finds and closes a 
connection of another listener to accommodate inter-broker connection. See 
KIP-402.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to