splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521614527
##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int):
Unit = {
// if there is a connection waiting on the rate throttle delay, we will
let it wait the original delay even if
// the rate limit increases, because it is just one connection per
listener and the code is simpler that way
- updateConnectionRateQuota(maxConnectionRate)
+ updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+ }
+
+ /**
+ * Update the connection rate quota for a given IP and updates quota configs
for updated IPs.
+ * If an IP is given, metric config will be updated only for the given IP,
otherwise
+ * all metric configs will be checked and updated if required.
+ *
+ * @param ip ip to update or default if None
+ * @param maxConnectionRate new connection rate, or resets entity to default
if None
+ */
+ def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate:
Option[Int]): Unit = {
+ def isIpConnectionRateMetric(metricName: MetricName) = {
+ metricName.name == ConnectionRateMetricName &&
+ metricName.group == MetricsGroup &&
+ metricName.tags.containsKey(IpMetricTag)
+ }
+
+ def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+ quotaLimit != metric.config.quota.bound
+ }
+ counts.synchronized {
+ ip match {
+ case Some(addr) =>
+ val address = InetAddress.getByName(addr)
Review comment:
Moved IP resolution to `IpConfigHandler`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]