splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r504219669



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1371,6 +1492,45 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a 
listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: 
Long): Unit = {
+    if (!protectedListener(listenerName)) {
+      brokerConnectionRateSensor.record(-1.0, timeMs, false)
+    }
+    maxConnectionsPerListener
+      .get(listenerName)
+      .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the 
connection
+   *
+   * @param address
+   * @param timeMs
+   * @return
+   */
+  private def recordIpConnectionMaybeThrottle(address: InetAddress, timeMs: 
Long): Long = {
+    val connectionRateQuota = connectionRateForIp(address)
+    val quotaEnabled = connectionRateQuota != 
DynamicConfig.Ip.UnlimitedConnectionCreationRate
+    if (!quotaEnabled) {
+      return 0
+    }
+    val sensor = 
getOrCreateConnectionRateQuotaSensor(connectionRateForIp(address), 
IpQuotaEntity(address))
+    val throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs)
+    if (throttleMs > 0) {
+      // unrecord the connection since we won't accept the connection
+      sensor.record(-1.0, timeMs, false)
+    }

Review comment:
       actually, i thought a bit more about it and in the case where an IP is 
supposed to be completely throttled (rate limit == 0), we would be accepting 
connections at a rate of 1 which would be very weird/unacceptable behavior. I 
left it at is. Let me know if that makes sense or if I have some holes in my 
logic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to