mimaison commented on code in PR #18196: URL: https://github.com/apache/kafka/pull/18196#discussion_r1942979063
########## core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala: ########## @@ -0,0 +1,69 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package kafka.server.metadata + +import kafka.server.{KafkaConfig, MetadataCache} +import kafka.server.QuotaFactory.QuotaManagers +import kafka.utils.Logging +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.image.loader.LoaderManifest +import org.apache.kafka.server.fault.FaultHandler + +/** + * Publishing dynamic topic or cluster changes to the client quota manager. + * Temporary solution since Cluster objects are immutable and costly to update for every metadata change. + * See KAFKA-18239 to trace the issue. + */ +class DynamicTopicClusterQuotaPublisher ( + clusterId: String, + conf: KafkaConfig, + faultHandler: FaultHandler, + nodeType: String, + quotaManagers: QuotaManagers +) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { + logIdent = s"[${name()}] " + + override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}" + + override def onMetadataUpdate( + delta: MetadataDelta, + newImage: MetadataImage, + manifest: LoaderManifest + ): Unit = { + onMetadataUpdate(delta, newImage) + } + + def onMetadataUpdate( + delta: MetadataDelta, + newImage: MetadataImage, + ): Unit = { + val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" + try { + quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => { + if (delta.topicsDelta() != null || delta.clusterDelta() != null) { + val cluster = MetadataCache.toCluster(clusterId, newImage) + if (clientQuotaCallback.updateClusterMetadata(cluster)) { + quotaManagers.fetch.updateQuotaMetricConfigs() + quotaManagers.produce.updateQuotaMetricConfigs() + quotaManagers.request.updateQuotaMetricConfigs() + quotaManagers.controllerMutation.updateQuotaMetricConfigs() + } + } + }) + } catch { + case t: Throwable => faultHandler.handleFault("Uncaught exception while " + + s"publishing dynamic topic or cluster changes from $deltaName", t) + } + } +} Review Comment: Nit: missing new line ########## core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala: ########## @@ -0,0 +1,69 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package kafka.server.metadata + +import kafka.server.{KafkaConfig, MetadataCache} +import kafka.server.QuotaFactory.QuotaManagers +import kafka.utils.Logging +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.image.loader.LoaderManifest +import org.apache.kafka.server.fault.FaultHandler + +/** + * Publishing dynamic topic or cluster changes to the client quota manager. + * Temporary solution since Cluster objects are immutable and costly to update for every metadata change. + * See KAFKA-18239 to trace the issue. + */ +class DynamicTopicClusterQuotaPublisher ( + clusterId: String, + conf: KafkaConfig, + faultHandler: FaultHandler, + nodeType: String, + quotaManagers: QuotaManagers +) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { + logIdent = s"[${name()}] " + + override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}" + + override def onMetadataUpdate( + delta: MetadataDelta, + newImage: MetadataImage, + manifest: LoaderManifest + ): Unit = { + onMetadataUpdate(delta, newImage) + } + + def onMetadataUpdate( + delta: MetadataDelta, + newImage: MetadataImage, + ): Unit = { + val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" Review Comment: Can we only compute this val in the `catch` block? ########## clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java: ########## @@ -89,11 +89,11 @@ public interface ClientQuotaCallback extends Configurable { boolean quotaResetRequired(ClientQuotaType quotaType); /** - * Metadata update callback that is invoked whenever UpdateMetadata request is received from - * the controller. This is useful if quota computation takes partitions into account. + * Metadata update callback that is invoked whenever the topic and cluster delta changed. Review Comment: I'm not sure we want to mention cluster/topic deltas here, this is a public API and part of our javadoc. I think we can say something like: `This callback is invoked whenever the cluster metadata changes. This includes brokers added or removed, topics created or deleted, and partition leadership changes.` ########## core/src/main/scala/kafka/server/MetadataCache.scala: ########## @@ -144,4 +149,95 @@ object MetadataCache { ): KRaftMetadataCache = { new KRaftMetadataCache(brokerId, kraftVersionSupplier) } + + def toCluster(clusterId: String, image: MetadataImage): Cluster = { + val brokerToNodes = new util.HashMap[Integer, java.util.List[Node]] Review Comment: We can drop the `java.` prefix in `java.util.List` ########## core/src/main/scala/kafka/server/MetadataCache.scala: ########## @@ -144,4 +149,95 @@ object MetadataCache { ): KRaftMetadataCache = { new KRaftMetadataCache(brokerId, kraftVersionSupplier) } + + def toCluster(clusterId: String, image: MetadataImage): Cluster = { + val brokerToNodes = new util.HashMap[Integer, java.util.List[Node]] + image.cluster().brokers() + .values().stream() + .filter(broker => !broker.fenced()) Review Comment: -> `.filter(!_.fenced())` but maybe you wrote it this way with the future conversion to Java in mind. In that case we can keep the code. ########## clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java: ########## @@ -89,11 +89,11 @@ public interface ClientQuotaCallback extends Configurable { boolean quotaResetRequired(ClientQuotaType quotaType); /** - * Metadata update callback that is invoked whenever UpdateMetadata request is received from - * the controller. This is useful if quota computation takes partitions into account. + * Metadata update callback that is invoked whenever the topic and cluster delta changed. + * This is useful if quota computation takes partitions into account. * Topics that are being deleted will not be included in `cluster`. * - * @param cluster Cluster metadata including partitions and their leaders if known + * @param cluster Cluster metadata including topic and cluster Review Comment: I'm not sure I understand this change. I prefer the previous message. ########## core/src/main/scala/kafka/server/MetadataCache.scala: ########## @@ -144,4 +149,95 @@ object MetadataCache { ): KRaftMetadataCache = { new KRaftMetadataCache(brokerId, kraftVersionSupplier) } + + def toCluster(clusterId: String, image: MetadataImage): Cluster = { + val brokerToNodes = new util.HashMap[Integer, java.util.List[Node]] + image.cluster().brokers() + .values().stream() + .filter(broker => !broker.fenced()) + .forEach { broker => brokerToNodes.put(broker.id(), broker.nodes()) } + + def getNodes(id: Int): java.util.List[Node] = brokerToNodes.get(id) Review Comment: Again drop the `java.` prefix ########## core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala: ########## @@ -87,11 +90,25 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { super.configureSecurityBeforeServersStart(testInfo) + } + + override def configureSecurityAfterServersStart(): Unit = { + super.configureSecurityAfterServersStart() createScramCredentials(createAdminClient(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) } + override def addFormatterSettings(formatter: Formatter): Unit = { + formatter.setScramArguments( + List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava) + } + + override def createPrivilegedAdminClient() = { + createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties, + kafkaClientSaslMechanism, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) Review Comment: From my limited testing it looks like this test works with the new consumer too. Is there a reason you used `getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly` instead of `getTestQuorumAndGroupProtocolParametersAll`? ########## core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala: ########## @@ -87,11 +90,25 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { super.configureSecurityBeforeServersStart(testInfo) + } Review Comment: Do we still need to keep `configureSecurityBeforeServersStart()` if all it does is call the `super` implementation? ########## core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala: ########## @@ -87,11 +90,25 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { super.configureSecurityBeforeServersStart(testInfo) + } + + override def configureSecurityAfterServersStart(): Unit = { + super.configureSecurityAfterServersStart() createScramCredentials(createAdminClient(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) } + override def addFormatterSettings(formatter: Formatter): Unit = { + formatter.setScramArguments( + List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava) Review Comment: We could use `util.List.of()` instead of `asJava` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org