junrao commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1378220184


##########
core/src/main/scala/kafka/server/ConfigHelper.scala:
##########
@@ -129,6 +130,25 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
                 (name, value) => new 
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
+
+          case ConfigResource.Type.CLIENT_METRICS =>
+            val entityName = resource.resourceName
+            if (resource.resourceName == null || 
resource.resourceName.isEmpty) {

Review Comment:
   Could we use `entityName`?



##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager {

Review Comment:
   Do we need to close ClientMetricsManager on shutdown?



##########
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation, etc. are
+ * defined in this class.
+ * <p>
+ * {
+ * <ul>
+ *   <li> name: Name supplied by CLI during the creation of the client metric 
subscription.
+ *   <li> metrics: List of metric prefixes
+ *   <li> intervalMs: A positive integer value >=0  tells the client that how 
often a client can push the metrics
+ *   <li> match: List of client matching patterns, that are used by broker to 
match the client instance
+ *   with the subscription.
+ * </ul>
+ * }
+ * <p>
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * <ul>
+ *    <li> "name" is a unique name for the subscription. This is used to 
identify the subscription in
+ *          the broker. Ex: "METRICS-SUB"
+ *    <li> "metrics" value should be comma separated metrics list. A prefix 
match on the requested metrics
+ *          is performed in clients to determine subscribed metrics. An empty 
list means no metrics subscribed.
+ *          A list containing just an empty string means all metrics 
subscribed.
+ *          Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *
+ *    <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is 
the interval at which the client
+ *          should push the metrics to the broker.
+ *
+ *    <li> "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *          pattern specified then broker considers that as all match which 
means the associated metrics
+ *          applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ *          which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
+ * </ul>
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+public class ClientMetricsConfigs {
+
+    public static final String SUBSCRIPTION_METRICS = "metrics";
+    public static final String PUSH_INTERVAL_MS = "interval.ms";
+    public static final String CLIENT_MATCH_PATTERN = "match";
+
+    public static final String CLIENT_INSTANCE_ID = "client_instance_id";
+    public static final String CLIENT_ID = "client_id";
+    public static final String CLIENT_SOFTWARE_NAME = "client_software_name";
+    public static final String CLIENT_SOFTWARE_VERSION = 
"client_software_version";
+    public static final String CLIENT_SOURCE_ADDRESS = "client_source_address";
+    public static final String CLIENT_SOURCE_PORT = "client_source_port";
+
+    public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
+    private static final int MIN_INTERVAL_MS = 100; // 100ms
+    private static final int MAX_INTERVAL_MS = 3600000; // 1 hour
+
+    private static final Set<String> ALLOWED_MATCH_PARAMS = new 
HashSet<>(Arrays.asList(
+        CLIENT_INSTANCE_ID,
+        CLIENT_ID,
+        CLIENT_SOFTWARE_NAME,
+        CLIENT_SOFTWARE_VERSION,
+        CLIENT_SOURCE_ADDRESS,
+        CLIENT_SOURCE_PORT
+    ));
+
+    private static final ConfigDef CONFIG = new ConfigDef()
+        .define(SUBSCRIPTION_METRICS, Type.LIST, Importance.MEDIUM, 
"Subscription metrics list")
+        .define(PUSH_INTERVAL_MS, Type.INT, Importance.MEDIUM, "Push interval 
in milliseconds")
+        .define(CLIENT_MATCH_PATTERN, Type.LIST, Importance.MEDIUM, "Client 
match pattern list");
+
+    public static ConfigDef configDef() {
+        return CONFIG;
+    }
+
+    public static Set<String> names() {
+        return CONFIG.names();
+    }
+
+    public static void validate(String subscriptionName, Properties 
properties) {
+        if (subscriptionName == null || subscriptionName.isEmpty()) {
+            throw new InvalidRequestException("Subscription name can't be 
empty");
+        }
+
+        validateProperties(properties);
+    }
+
+    private static void validateProperties(Properties properties) {
+        // Make sure that all the properties are valid
+        properties.forEach((key, value) -> {
+            if (!names().contains(key)) {
+                throw new InvalidRequestException("Unknown client metrics 
configuration: " + key);
+            }
+        });
+
+        // Make sure that push interval is between 100ms and 1 hour.
+        if (properties.containsKey(PUSH_INTERVAL_MS)) {
+            int pushIntervalMs = 
Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS));
+            if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > 
MAX_INTERVAL_MS) {
+                throw new InvalidRequestException("Invalid value for 
interval.ms, interval must be between 100 and 3600000 (1 hour)");

Review Comment:
   Could we include `pushIntervalMs` in the error message?



##########
core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala:
##########
@@ -101,4 +102,55 @@ class ControllerConfigurationValidatorTest {
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
         new ConfigResource(BROKER, "-1"), config)). getMessage())
   }
+
+  @Test
+  def testValidClientMetricsConfig(): Unit = {
+    val config = new TreeMap[String, String]()
+    config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000")
+    config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, 
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
+    config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, 
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
+      
",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1,"
 +

Review Comment:
   We use `apache-kafka-java` here, but `java` in `ClientMetricsTestUtils`. 
What's the standard name?



##########
core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+public class ClientMetricsTestUtils {
+
+    public static final String DEFAULT_METRICS =
+        
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency";

Review Comment:
   extra dot at the end of org.apache.kafka.client.producer.partition.queue.?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4089,13 +4119,19 @@ public void testIncrementalAlterConfigs()  throws 
Exception {
                     new ConfigEntry("compression.type", "gzip"),
                     AlterConfigOp.OpType.APPEND);
 
+            AlterConfigOp alterConfigOp3 = new AlterConfigOp(

Review Comment:
   This doesn't seem to be a valid config for CLIENT_METRICS?



##########
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation, etc. are
+ * defined in this class.
+ * <p>
+ * {
+ * <ul>
+ *   <li> name: Name supplied by CLI during the creation of the client metric 
subscription.
+ *   <li> metrics: List of metric prefixes
+ *   <li> intervalMs: A positive integer value >=0  tells the client that how 
often a client can push the metrics
+ *   <li> match: List of client matching patterns, that are used by broker to 
match the client instance
+ *   with the subscription.
+ * </ul>
+ * }
+ * <p>
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * <ul>
+ *    <li> "name" is a unique name for the subscription. This is used to 
identify the subscription in
+ *          the broker. Ex: "METRICS-SUB"
+ *    <li> "metrics" value should be comma separated metrics list. A prefix 
match on the requested metrics
+ *          is performed in clients to determine subscribed metrics. An empty 
list means no metrics subscribed.
+ *          A list containing just an empty string means all metrics 
subscribed.
+ *          Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *
+ *    <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is 
the interval at which the client
+ *          should push the metrics to the broker.
+ *
+ *    <li> "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *          pattern specified then broker considers that as all match which 
means the associated metrics
+ *          applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ *          which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
+ * </ul>
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+public class ClientMetricsConfigs {
+
+    public static final String SUBSCRIPTION_METRICS = "metrics";
+    public static final String PUSH_INTERVAL_MS = "interval.ms";
+    public static final String CLIENT_MATCH_PATTERN = "match";
+
+    public static final String CLIENT_INSTANCE_ID = "client_instance_id";
+    public static final String CLIENT_ID = "client_id";
+    public static final String CLIENT_SOFTWARE_NAME = "client_software_name";
+    public static final String CLIENT_SOFTWARE_VERSION = 
"client_software_version";
+    public static final String CLIENT_SOURCE_ADDRESS = "client_source_address";
+    public static final String CLIENT_SOURCE_PORT = "client_source_port";
+
+    public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
+    private static final int MIN_INTERVAL_MS = 100; // 100ms
+    private static final int MAX_INTERVAL_MS = 3600000; // 1 hour
+
+    private static final Set<String> ALLOWED_MATCH_PARAMS = new 
HashSet<>(Arrays.asList(
+        CLIENT_INSTANCE_ID,
+        CLIENT_ID,
+        CLIENT_SOFTWARE_NAME,
+        CLIENT_SOFTWARE_VERSION,
+        CLIENT_SOURCE_ADDRESS,
+        CLIENT_SOURCE_PORT
+    ));
+
+    private static final ConfigDef CONFIG = new ConfigDef()
+        .define(SUBSCRIPTION_METRICS, Type.LIST, Importance.MEDIUM, 
"Subscription metrics list")
+        .define(PUSH_INTERVAL_MS, Type.INT, Importance.MEDIUM, "Push interval 
in milliseconds")
+        .define(CLIENT_MATCH_PATTERN, Type.LIST, Importance.MEDIUM, "Client 
match pattern list");
+
+    public static ConfigDef configDef() {
+        return CONFIG;
+    }
+
+    public static Set<String> names() {
+        return CONFIG.names();
+    }
+
+    public static void validate(String subscriptionName, Properties 
properties) {
+        if (subscriptionName == null || subscriptionName.isEmpty()) {
+            throw new InvalidRequestException("Subscription name can't be 
empty");
+        }
+
+        validateProperties(properties);
+    }
+
+    private static void validateProperties(Properties properties) {
+        // Make sure that all the properties are valid
+        properties.forEach((key, value) -> {
+            if (!names().contains(key)) {
+                throw new InvalidRequestException("Unknown client metrics 
configuration: " + key);
+            }
+        });
+
+        // Make sure that push interval is between 100ms and 1 hour.
+        if (properties.containsKey(PUSH_INTERVAL_MS)) {
+            int pushIntervalMs = 
Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS));
+            if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > 
MAX_INTERVAL_MS) {
+                throw new InvalidRequestException("Invalid value for 
interval.ms, interval must be between 100 and 3600000 (1 hour)");
+            }
+        }
+
+        // Make sure that client match patterns are valid by parsing them.
+        if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
+            List<String> patterns = 
Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(","));
+            // Parse the client matching patterns to validate if the patterns 
are valid.
+            parseMatchingPatterns(patterns);
+        }
+    }
+
+    /**
+     * Parses the client matching patterns and builds a map with entries that 
has
+     * (PatternName, PatternValue) as the entries.
+     *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 
1.2.3)
+     * <p>
+     *  NOTES:
+     *  Client match pattern splits the input into two parts separated by 
first occurrence of the character '='
+     *
+     *  @param patterns List of client matching pattern strings
+     * @return map of client matching pattern entries
+     */
+    public static Map<String, String> parseMatchingPatterns(List<String> 
patterns) {
+        Map<String, String> patternsMap = new HashMap<>();
+        if (patterns != null) {
+            patterns.forEach(pattern -> {
+                String[] nameValuePair = pattern.split("=");
+                if (nameValuePair.length == 2 && 
isValidParam(nameValuePair[0].trim()) &&

Review Comment:
   Could we avoid calling `nameValuePair[0].trim()` twice?



##########
core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala:
##########
@@ -101,4 +102,55 @@ class ControllerConfigurationValidatorTest {
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
         new ConfigResource(BROKER, "-1"), config)). getMessage())
   }
+
+  @Test
+  def testValidClientMetricsConfig(): Unit = {
+    val config = new TreeMap[String, String]()
+    config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000")
+    config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, 
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
+    config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, 
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
+      
",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1,"
 +
+      "client_source_port=1234")
+    validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), 
config)
+  }
+
+  @Test
+  def testInvalidSubscriptionIdClientMetricsConfig(): Unit = {

Review Comment:
   It's testing invalid subscription name instead of invalid subscription id.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -106,7 +106,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val clusterId: String,
                 time: Time,
                 val tokenManager: DelegationTokenManager,
-                val apiVersionManager: ApiVersionManager
+                val apiVersionManager: ApiVersionManager,
+                val clientMetricsManager: ClientMetricsManager

Review Comment:
   Will `clientMetricsManager` be used in future PRs?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2619,6 +2619,9 @@ private ConfigEntry.ConfigSource 
configSource(DescribeConfigsResponse.ConfigSour
             case DYNAMIC_BROKER_LOGGER_CONFIG:
                 configSource = 
ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG;
                 break;
+            case CLIENT_METRICS_CONFIG:

Review Comment:
   Hmm, this method seems very specific to get/create topic. Do we really need 
to add client metric here?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -477,6 +479,128 @@ class KafkaApisTest {
     testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
   }
 
+  @Test
+  def testAlterConfigsClientMetrics(): Unit = {
+    val subscriptionName = "client_metric_subscription_1"
+    val authorizedResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val props = ClientMetricsTestUtils.getDefaultProperties
+    val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]()
+    props.forEach((x, y) =>
+      configEntries.add(new 
AlterConfigsRequest.ConfigEntry(x.asInstanceOf[String], 
y.asInstanceOf[String])))
+
+    val configs = Map(authorizedResource -> new 
AlterConfigsRequest.Config(configEntries))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0)
+    val request = buildRequest(
+      new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion))
+
+    when(controller.isActive).thenReturn(false)
+    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
+      .thenReturn(Map(authorizedResource -> ApiError.NONE))
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleAlterConfigsRequest(request)
+    val response = verifyNoThrottling[AlterConfigsResponse](request)
+    verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE))
+    verify(authorizer, times(1)).authorize(any(), any())
+    verify(adminManager).alterConfigs(any(), anyBoolean())
+  }
+
+  @Test
+  def testIncrementalClientMetricAlterConfigs(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+
+    val subscriptionName = "client_metric_subscription_1"
+    val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, 
subscriptionName)
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS,
+      ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0)
+
+    val incrementalAlterConfigsRequest = 
getIncrementalClientMetricsAlterConfigRequestBuilder(
+      Seq(resource)).build(requestHeader.apiVersion)
+    val request = buildRequest(incrementalAlterConfigsRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    when(controller.isActive).thenReturn(true)
+    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(adminManager.incrementalAlterConfigs(any(), 
ArgumentMatchers.eq(false)))
+      .thenReturn(Map(resource -> ApiError.NONE))
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
+    val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
+    verifyIncrementalAlterConfigResult(response, Map(subscriptionName -> 
Errors.NONE ))
+    verify(authorizer, times(1)).authorize(any(), any())
+    verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
+  }
+
+  private def 
getIncrementalClientMetricsAlterConfigRequestBuilder(configResources: 
Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {
+    val resourceMap = configResources.map(configResource => {
+      val entryToBeModified = new ConfigEntry("metrics", "foo.bar")
+      configResource -> Set(new AlterConfigOp(entryToBeModified, 
OpType.SET)).asJavaCollection
+    }).toMap.asJava
+    new IncrementalAlterConfigsRequest.Builder(resourceMap, false)
+  }
+
+  @Test
+  def testDescribeConfigsClientMetrics(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val operation = AclOperation.DESCRIBE_CONFIGS
+    val resourceType = ResourceType.CLUSTER
+    val subscriptionName = "client_metric_subscription_1"
+    val requestHeader =
+      new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, 
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+    val expectedActions = Seq(
+      new Action(operation, new ResourcePattern(resourceType, 
Resource.CLUSTER_NAME, PatternType.LITERAL),
+        1, true, true)
+    )
+    // Verify that authorize is only called once

Review Comment:
   The verification happens later in line 593.



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -567,7 +572,8 @@ class KafkaServer(
                                                            ConfigType.Client 
-> new ClientIdConfigHandler(quotaManagers),
                                                            ConfigType.User -> 
new UserConfigHandler(quotaManagers, credentialProvider),
                                                            ConfigType.Broker 
-> new BrokerConfigHandler(config, quotaManagers),
-                                                           ConfigType.Ip -> 
new IpConfigHandler(socketServer.connectionQuotas))
+                                                           ConfigType.Ip -> 
new IpConfigHandler(socketServer.connectionQuotas),
+                                                           
ConfigType.ClientMetrics -> new 
ClientMetricsConfigHandler(clientMetricsManager))

Review Comment:
   Do we need this since the KIP says the feature is only supported in KRaft 
mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to