junrao commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1378220184
########## core/src/main/scala/kafka/server/ConfigHelper.scala: ########## @@ -129,6 +130,25 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name) .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id) .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)) + + case ConfigResource.Type.CLIENT_METRICS => + val entityName = resource.resourceName + if (resource.resourceName == null || resource.resourceName.isEmpty) { Review Comment: Could we use `entityName`? ########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager { Review Comment: Do we need to close ClientMetricsManager on shutdown? ########## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidRequestException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Client metric configuration related parameters and the supporting methods like validation, etc. are + * defined in this class. + * <p> + * { + * <ul> + * <li> name: Name supplied by CLI during the creation of the client metric subscription. + * <li> metrics: List of metric prefixes + * <li> intervalMs: A positive integer value >=0 tells the client that how often a client can push the metrics + * <li> match: List of client matching patterns, that are used by broker to match the client instance + * with the subscription. + * </ul> + * } + * <p> + * At present, CLI can pass the following parameters in request to add/delete/update the client metrics + * subscription: + * <ul> + * <li> "name" is a unique name for the subscription. This is used to identify the subscription in + * the broker. Ex: "METRICS-SUB" + * <li> "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. + * A list containing just an empty string means all metrics subscribed. + * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" + * + * <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the interval at which the client + * should push the metrics to the broker. + * + * <li> "match" is a comma separated list of client match patterns, in case if there is no matching + * pattern specified then broker considers that as all match which means the associated metrics + * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" + * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. + * </ul> + * For more information please look at kip-714: + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + */ +public class ClientMetricsConfigs { + + public static final String SUBSCRIPTION_METRICS = "metrics"; + public static final String PUSH_INTERVAL_MS = "interval.ms"; + public static final String CLIENT_MATCH_PATTERN = "match"; + + public static final String CLIENT_INSTANCE_ID = "client_instance_id"; + public static final String CLIENT_ID = "client_id"; + public static final String CLIENT_SOFTWARE_NAME = "client_software_name"; + public static final String CLIENT_SOFTWARE_VERSION = "client_software_version"; + public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; + public static final String CLIENT_SOURCE_PORT = "client_source_port"; + + public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private static final int MIN_INTERVAL_MS = 100; // 100ms + private static final int MAX_INTERVAL_MS = 3600000; // 1 hour + + private static final Set<String> ALLOWED_MATCH_PARAMS = new HashSet<>(Arrays.asList( + CLIENT_INSTANCE_ID, + CLIENT_ID, + CLIENT_SOFTWARE_NAME, + CLIENT_SOFTWARE_VERSION, + CLIENT_SOURCE_ADDRESS, + CLIENT_SOURCE_PORT + )); + + private static final ConfigDef CONFIG = new ConfigDef() + .define(SUBSCRIPTION_METRICS, Type.LIST, Importance.MEDIUM, "Subscription metrics list") + .define(PUSH_INTERVAL_MS, Type.INT, Importance.MEDIUM, "Push interval in milliseconds") + .define(CLIENT_MATCH_PATTERN, Type.LIST, Importance.MEDIUM, "Client match pattern list"); + + public static ConfigDef configDef() { + return CONFIG; + } + + public static Set<String> names() { + return CONFIG.names(); + } + + public static void validate(String subscriptionName, Properties properties) { + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new InvalidRequestException("Subscription name can't be empty"); + } + + validateProperties(properties); + } + + private static void validateProperties(Properties properties) { + // Make sure that all the properties are valid + properties.forEach((key, value) -> { + if (!names().contains(key)) { + throw new InvalidRequestException("Unknown client metrics configuration: " + key); + } + }); + + // Make sure that push interval is between 100ms and 1 hour. + if (properties.containsKey(PUSH_INTERVAL_MS)) { + int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); + if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { + throw new InvalidRequestException("Invalid value for interval.ms, interval must be between 100 and 3600000 (1 hour)"); Review Comment: Could we include `pushIntervalMs` in the error message? ########## core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala: ########## @@ -101,4 +102,55 @@ class ControllerConfigurationValidatorTest { assertThrows(classOf[InvalidRequestException], () => validator.validate( new ConfigResource(BROKER, "-1"), config)). getMessage()) } + + @Test + def testValidClientMetricsConfig(): Unit = { + val config = new TreeMap[String, String]() + config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000") + config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency") + config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" + + ",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1," + Review Comment: We use `apache-kafka-java` here, but `java` in `ClientMetricsTestUtils`. What's the standard name? ########## core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +public class ClientMetricsTestUtils { + + public static final String DEFAULT_METRICS = + "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency"; Review Comment: extra dot at the end of org.apache.kafka.client.producer.partition.queue.? ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -4089,13 +4119,19 @@ public void testIncrementalAlterConfigs() throws Exception { new ConfigEntry("compression.type", "gzip"), AlterConfigOp.OpType.APPEND); + AlterConfigOp alterConfigOp3 = new AlterConfigOp( Review Comment: This doesn't seem to be a valid config for CLIENT_METRICS? ########## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidRequestException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Client metric configuration related parameters and the supporting methods like validation, etc. are + * defined in this class. + * <p> + * { + * <ul> + * <li> name: Name supplied by CLI during the creation of the client metric subscription. + * <li> metrics: List of metric prefixes + * <li> intervalMs: A positive integer value >=0 tells the client that how often a client can push the metrics + * <li> match: List of client matching patterns, that are used by broker to match the client instance + * with the subscription. + * </ul> + * } + * <p> + * At present, CLI can pass the following parameters in request to add/delete/update the client metrics + * subscription: + * <ul> + * <li> "name" is a unique name for the subscription. This is used to identify the subscription in + * the broker. Ex: "METRICS-SUB" + * <li> "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. + * A list containing just an empty string means all metrics subscribed. + * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" + * + * <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the interval at which the client + * should push the metrics to the broker. + * + * <li> "match" is a comma separated list of client match patterns, in case if there is no matching + * pattern specified then broker considers that as all match which means the associated metrics + * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" + * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. + * </ul> + * For more information please look at kip-714: + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + */ +public class ClientMetricsConfigs { + + public static final String SUBSCRIPTION_METRICS = "metrics"; + public static final String PUSH_INTERVAL_MS = "interval.ms"; + public static final String CLIENT_MATCH_PATTERN = "match"; + + public static final String CLIENT_INSTANCE_ID = "client_instance_id"; + public static final String CLIENT_ID = "client_id"; + public static final String CLIENT_SOFTWARE_NAME = "client_software_name"; + public static final String CLIENT_SOFTWARE_VERSION = "client_software_version"; + public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; + public static final String CLIENT_SOURCE_PORT = "client_source_port"; + + public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private static final int MIN_INTERVAL_MS = 100; // 100ms + private static final int MAX_INTERVAL_MS = 3600000; // 1 hour + + private static final Set<String> ALLOWED_MATCH_PARAMS = new HashSet<>(Arrays.asList( + CLIENT_INSTANCE_ID, + CLIENT_ID, + CLIENT_SOFTWARE_NAME, + CLIENT_SOFTWARE_VERSION, + CLIENT_SOURCE_ADDRESS, + CLIENT_SOURCE_PORT + )); + + private static final ConfigDef CONFIG = new ConfigDef() + .define(SUBSCRIPTION_METRICS, Type.LIST, Importance.MEDIUM, "Subscription metrics list") + .define(PUSH_INTERVAL_MS, Type.INT, Importance.MEDIUM, "Push interval in milliseconds") + .define(CLIENT_MATCH_PATTERN, Type.LIST, Importance.MEDIUM, "Client match pattern list"); + + public static ConfigDef configDef() { + return CONFIG; + } + + public static Set<String> names() { + return CONFIG.names(); + } + + public static void validate(String subscriptionName, Properties properties) { + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new InvalidRequestException("Subscription name can't be empty"); + } + + validateProperties(properties); + } + + private static void validateProperties(Properties properties) { + // Make sure that all the properties are valid + properties.forEach((key, value) -> { + if (!names().contains(key)) { + throw new InvalidRequestException("Unknown client metrics configuration: " + key); + } + }); + + // Make sure that push interval is between 100ms and 1 hour. + if (properties.containsKey(PUSH_INTERVAL_MS)) { + int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); + if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { + throw new InvalidRequestException("Invalid value for interval.ms, interval must be between 100 and 3600000 (1 hour)"); + } + } + + // Make sure that client match patterns are valid by parsing them. + if (properties.containsKey(CLIENT_MATCH_PATTERN)) { + List<String> patterns = Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(",")); + // Parse the client matching patterns to validate if the patterns are valid. + parseMatchingPatterns(patterns); + } + } + + /** + * Parses the client matching patterns and builds a map with entries that has + * (PatternName, PatternValue) as the entries. + * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3) + * <p> + * NOTES: + * Client match pattern splits the input into two parts separated by first occurrence of the character '=' + * + * @param patterns List of client matching pattern strings + * @return map of client matching pattern entries + */ + public static Map<String, String> parseMatchingPatterns(List<String> patterns) { + Map<String, String> patternsMap = new HashMap<>(); + if (patterns != null) { + patterns.forEach(pattern -> { + String[] nameValuePair = pattern.split("="); + if (nameValuePair.length == 2 && isValidParam(nameValuePair[0].trim()) && Review Comment: Could we avoid calling `nameValuePair[0].trim()` twice? ########## core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala: ########## @@ -101,4 +102,55 @@ class ControllerConfigurationValidatorTest { assertThrows(classOf[InvalidRequestException], () => validator.validate( new ConfigResource(BROKER, "-1"), config)). getMessage()) } + + @Test + def testValidClientMetricsConfig(): Unit = { + val config = new TreeMap[String, String]() + config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000") + config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, "org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency") + config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" + + ",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1," + + "client_source_port=1234") + validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), config) + } + + @Test + def testInvalidSubscriptionIdClientMetricsConfig(): Unit = { Review Comment: It's testing invalid subscription name instead of invalid subscription id. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -106,7 +106,8 @@ class KafkaApis(val requestChannel: RequestChannel, val clusterId: String, time: Time, val tokenManager: DelegationTokenManager, - val apiVersionManager: ApiVersionManager + val apiVersionManager: ApiVersionManager, + val clientMetricsManager: ClientMetricsManager Review Comment: Will `clientMetricsManager` be used in future PRs? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2619,6 +2619,9 @@ private ConfigEntry.ConfigSource configSource(DescribeConfigsResponse.ConfigSour case DYNAMIC_BROKER_LOGGER_CONFIG: configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG; break; + case CLIENT_METRICS_CONFIG: Review Comment: Hmm, this method seems very specific to get/create topic. Do we really need to add client metric here? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -477,6 +479,128 @@ class KafkaApisTest { testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder) } + @Test + def testAlterConfigsClientMetrics(): Unit = { + val subscriptionName = "client_metric_subscription_1" + val authorizedResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, + Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) + + val props = ClientMetricsTestUtils.getDefaultProperties + val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]() + props.forEach((x, y) => + configEntries.add(new AlterConfigsRequest.ConfigEntry(x.asInstanceOf[String], y.asInstanceOf[String]))) + + val configs = Map(authorizedResource -> new AlterConfigsRequest.Config(configEntries)) + + val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0) + val request = buildRequest( + new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion)) + + when(controller.isActive).thenReturn(false) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false))) + .thenReturn(Map(authorizedResource -> ApiError.NONE)) + + createKafkaApis(authorizer = Some(authorizer)).handleAlterConfigsRequest(request) + val response = verifyNoThrottling[AlterConfigsResponse](request) + verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE)) + verify(authorizer, times(1)).authorize(any(), any()) + verify(adminManager).alterConfigs(any(), anyBoolean()) + } + + @Test + def testIncrementalClientMetricAlterConfigs(): Unit = { + val authorizer: Authorizer = mock(classOf[Authorizer]) + + val subscriptionName = "client_metric_subscription_1" + val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName) + + authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, + Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) + + val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS, + ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0) + + val incrementalAlterConfigsRequest = getIncrementalClientMetricsAlterConfigRequestBuilder( + Seq(resource)).build(requestHeader.apiVersion) + val request = buildRequest(incrementalAlterConfigsRequest, + fromPrivilegedListener = true, requestHeader = Option(requestHeader)) + + when(controller.isActive).thenReturn(true) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false))) + .thenReturn(Map(resource -> ApiError.NONE)) + + createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request) + val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) + verifyIncrementalAlterConfigResult(response, Map(subscriptionName -> Errors.NONE )) + verify(authorizer, times(1)).authorize(any(), any()) + verify(adminManager).incrementalAlterConfigs(any(), anyBoolean()) + } + + private def getIncrementalClientMetricsAlterConfigRequestBuilder(configResources: Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = { + val resourceMap = configResources.map(configResource => { + val entryToBeModified = new ConfigEntry("metrics", "foo.bar") + configResource -> Set(new AlterConfigOp(entryToBeModified, OpType.SET)).asJavaCollection + }).toMap.asJava + new IncrementalAlterConfigsRequest.Builder(resourceMap, false) + } + + @Test + def testDescribeConfigsClientMetrics(): Unit = { + val authorizer: Authorizer = mock(classOf[Authorizer]) + val operation = AclOperation.DESCRIBE_CONFIGS + val resourceType = ResourceType.CLUSTER + val subscriptionName = "client_metric_subscription_1" + val requestHeader = + new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0) + val expectedActions = Seq( + new Action(operation, new ResourcePattern(resourceType, Resource.CLUSTER_NAME, PatternType.LITERAL), + 1, true, true) + ) + // Verify that authorize is only called once Review Comment: The verification happens later in line 593. ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -567,7 +572,8 @@ class KafkaServer( ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers), - ConfigType.Ip -> new IpConfigHandler(socketServer.connectionQuotas)) + ConfigType.Ip -> new IpConfigHandler(socketServer.connectionQuotas), + ConfigType.ClientMetrics -> new ClientMetricsConfigHandler(clientMetricsManager)) Review Comment: Do we need this since the KIP says the feature is only supported in KRaft mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org