josefk31 commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2109435789


##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -17,16 +17,21 @@
 package kafka.server
 
 import kafka.server.ClientQuotaManager.BaseUserEntity
+import org.apache.kafka.common.Cluster
 
+import java.{lang, util}

Review Comment:
   Do we need such a broad import statement? It seems like the pattern in the 
rest of the imports is to bring in only the classes which are used. 



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     case None => new DefaultQuotaCallback
   }
   private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
+  private val activeQuotaEntities = new ConcurrentHashMap[KafkaQuotaEntity, 
Boolean]()

Review Comment:
   Does this need to be a `ConcurrentHashMap` if the only place it is updated / 
read is within the `updateQuota` method which is already guarded by write 
`lock`? In theory; while the write-lock is aquired no other thread should be 
able to aquire a read-lock.  



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -194,9 +195,6 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
   /**
    * Returns true if any quotas are enabled for this quota manager. This is 
used
    * to determine if quota related metrics should be created.
-   * Note: If any quotas (static defaults, dynamic defaults or quota 
overrides) have
-   * been configured for this broker at any time for this quota type, 
quotasEnabled will
-   * return true until the next broker restart, even if all quotas are 
subsequently deleted.

Review Comment:
   It seems the original behaviour for quota was: "When once quota is on, it is 
on until next restart". This PR changes it to "Quota is on as long as there is 
an active quota configuration". What were the assumptions behind the original 
behaviour - as the original behaviour seems "odd" to me. Why would we want to 
make disabling quota require a restart? 



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     case None => new DefaultQuotaCallback
   }
   private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
+  private val activeQuotaEntities = new ConcurrentHashMap[KafkaQuotaEntity, 
Boolean]()
 
   @volatile
   private var quotaTypesEnabled = clientQuotaCallbackPlugin match {

Review Comment:
   Nit: Since we're doing a lot of bit-manipulation on this value, it could 
make sense to convert it to an [enum 
set](https://docs.oracle.com/javase/8/docs/api/java/util/EnumSet.html). This 
would provide an efficient bit-vector interface and allow explicit 
"get/set/remove" operations. But feel free to ignore this comment if it doesn't 
make sense in scala. 



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +427,19 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     try {
       val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
 
-      if (userEntity.nonEmpty) {
-        if (quotaEntity.clientIdEntity.nonEmpty)
-          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-        else
-          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-      } else if (clientEntity.nonEmpty)
-        quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
       quota match {
-        case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, 
quotaEntity, newQuota.bound)
-        case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+        case Some(newQuota) =>
+          quotaCallback.updateQuota(clientQuotaType, quotaEntity, 
newQuota.bound)
+          if(!activeQuotaEntities.put(quotaEntity, true)){

Review Comment:
   Nit; do we ever insert a `false` value? If not, consider using a `HashSet` 
instead. 



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +451,37 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   */
+  private def updateQuotaTypes(): Unit = {
+    quotaTypesEnabled =  if (clientQuotaCallbackPlugin.isDefined) {
+        QuotaTypes.CustomQuotas
+      } else {
+        QuotaTypes.NoQuotas
+      }
+
+    activeQuotaEntities.forEach { (entity, _) =>
+      entity match {
+        case KafkaQuotaEntity(Some(_), Some(_)) =>
+          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+        case KafkaQuotaEntity(Some(_), None) =>
+          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+        case KafkaQuotaEntity(None, Some(_)) =>
+          quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+        case _ => // Unexpected entity type
+      }
+    }
+
+    val activeEntities = if (activeQuotaEntities.isEmpty) "No active quota 
entities" else activeQuotaEntities.keys.asScala.map(_.toString).mkString(", ")
+    info(s"Quota types enabled changed to $quotaTypesEnabled with active quota 
entities: [$activeEntities]")

Review Comment:
   How will this appear in logs? It may not be very useful if it's just a 
number being written out. If we're outputting this information I suggest we 
also write a "stringification" of this value so that it's clear what 
`quota-types`are enabled. Or if it's emitting the same information as 
"activeEntities", then omit it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to