josefk31 commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2109435789
########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -17,16 +17,21 @@ package kafka.server import kafka.server.ClientQuotaManager.BaseUserEntity +import org.apache.kafka.common.Cluster +import java.{lang, util} Review Comment: Do we need such a broad import statement? It seems like the pattern in the rest of the imports is to bring in only the classes which are used. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case None => new DefaultQuotaCallback } private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) + private val activeQuotaEntities = new ConcurrentHashMap[KafkaQuotaEntity, Boolean]() Review Comment: Does this need to be a `ConcurrentHashMap` if the only place it is updated / read is within the `updateQuota` method which is already guarded by write `lock`? In theory; while the write-lock is aquired no other thread should be able to aquire a read-lock. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -194,9 +195,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, /** * Returns true if any quotas are enabled for this quota manager. This is used * to determine if quota related metrics should be created. - * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have - * been configured for this broker at any time for this quota type, quotasEnabled will - * return true until the next broker restart, even if all quotas are subsequently deleted. Review Comment: It seems the original behaviour for quota was: "When once quota is on, it is on until next restart". This PR changes it to "Quota is on as long as there is an active quota configuration". What were the assumptions behind the original behaviour - as the original behaviour seems "odd" to me. Why would we want to make disabling quota require a restart? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case None => new DefaultQuotaCallback } private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) + private val activeQuotaEntities = new ConcurrentHashMap[KafkaQuotaEntity, Boolean]() @volatile private var quotaTypesEnabled = clientQuotaCallbackPlugin match { Review Comment: Nit: Since we're doing a lot of bit-manipulation on this value, it could make sense to convert it to an [enum set](https://docs.oracle.com/javase/8/docs/api/java/util/EnumSet.html). This would provide an efficient bit-vector interface and allow explicit "get/set/remove" operations. But feel free to ignore this comment if it doesn't make sense in scala. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -428,18 +427,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { - if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled - else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - quota match { - case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) - case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) + case Some(newQuota) => + quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) + if(!activeQuotaEntities.put(quotaEntity, true)){ Review Comment: Nit; do we ever insert a `false` value? If not, consider using a `HashSet` instead. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +451,37 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + */ + private def updateQuotaTypes(): Unit = { + quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { + QuotaTypes.CustomQuotas + } else { + QuotaTypes.NoQuotas + } + + activeQuotaEntities.forEach { (entity, _) => + entity match { + case KafkaQuotaEntity(Some(_), Some(_)) => + quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled + case KafkaQuotaEntity(Some(_), None) => + quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled + case KafkaQuotaEntity(None, Some(_)) => + quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled + case _ => // Unexpected entity type + } + } + + val activeEntities = if (activeQuotaEntities.isEmpty) "No active quota entities" else activeQuotaEntities.keys.asScala.map(_.toString).mkString(", ") + info(s"Quota types enabled changed to $quotaTypesEnabled with active quota entities: [$activeEntities]") Review Comment: How will this appear in logs? It may not be very useful if it's just a number being written out. If we're outputting this information I suggest we also write a "stringification" of this value so that it's clear what `quota-types`are enabled. Or if it's emitting the same information as "activeEntities", then omit it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org