apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r439143850
########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def waitForConnectionSlot(listenerName: ListenerName, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - if (!connectionSlotAvailable(listenerName)) { + val startTimeMs = time.milliseconds() + val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0) + + if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) { val startNs = time.nanoseconds + val endThrottleTimeMs = startTimeMs + throttleTimeMs + var remainingThrottleTimeMs = throttleTimeMs do { - counts.wait() - } while (!connectionSlotAvailable(listenerName)) + counts.wait(remainingThrottleTimeMs) Review comment: This is exactly the behavior proposed in KIP -- if we reach any limit (number of connections or connection rate), we need to wait. So, if there is no space for a new connection, and the delay due to rate limit has passed, we would have to wait for a connection slot. However, remember that if we are waiting on an inter-broker connection slot, the broker finds and closes a connection of another listener to accommodate inter-broker connection. See KIP-402. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org