AndrewJSchofield commented on code in PR #19493:
URL: https://github.com/apache/kafka/pull/19493#discussion_r2079364838


##########
clients/src/main/resources/common/message/ListConfigResourcesResponse.json:
##########
@@ -16,18 +16,22 @@
 {
   "apiKey": 74,
   "type": "response",
-  "name": "ListClientMetricsResourcesResponse",
-  "validVersions": "0",
+  "name": "ListConfigResourcesResponse",
+  // Version 0 is used as ListClientMetricsResourcesResponse which returns all 
client metrics resources.
+  // Version 1 adds ResourceType to ConfigResources (KIP-1142).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
       { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
         "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The error code, or 0 if there was no error." },
-      { "name": "ClientMetricsResources", "type": "[]ClientMetricsResource", 
"versions": "0+",
-        "about": "Each client metrics resource in the response.", "fields": [
-        { "name": "Name", "type": "string", "versions": "0+",
-          "about": "The resource name." }
+      { "name": "ConfigResources", "type": "[]ConfigResource", "versions": 
"0+",
+        "about": "Each config resource in the response.", "fields": [
+        { "name": "ResourceName", "type": "string", "versions": "0+",
+          "about": "The resource name." },
+        { "name": "ResourceType", "type": "int8", "versions": "1+", 
"ignorable": true,

Review Comment:
   Should this be `"default": "16"`? I'm still researching this, so that's an 
open question.



##########
clients/src/main/resources/common/message/ListConfigResourcesRequest.json:
##########
@@ -17,10 +17,14 @@
   "apiKey": 74,
   "type": "request",
   "listeners": ["broker"],
-  "name": "ListClientMetricsResourcesRequest",
-  "validVersions": "0",
+  "name": "ListConfigResourcesRequest",
+  // Version 0 is used as ListClientMetricsResourcesRequest which only lists 
client metrics resources.
+  // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified 
ResourceTypes, it should return all configuration resources.
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
+    { "name": "ResourceTypes", "type": "[]int8", "versions": "1+", 
"ignorable": true,

Review Comment:
   I don't see the value of `ignorable` here. If it's v0, the default is 
essentially an array containing `ConfigResource.CLIENT_METRICS`.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2874,16 +2874,61 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
-    val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
+  private def handleListConfigResources(request: RequestChannel.Request): Unit 
= {
+    val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
 
     if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
-      requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+      requestHelper.sendMaybeThrottle(request, 
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
-        clientMetricsManager.listClientMetricsResources.stream.map(
-          name => new 
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
-      requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+      val data = new ListConfigResourcesResponseData()
+
+      if (request.header.apiVersion() == 0) {
+        // Version 0 only supports client metrics.
+        
data.setConfigResources(clientMetricsManager.listClientMetricsResources.stream.map(name
 =>
+            new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
+          ).collect(Collectors.toList()))
+      } else {
+        // From version 1, supports all ConfigResource.Type.
+        var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+        if (resourceTypes.isEmpty) {
+          resourceTypes = util.List.of(

Review Comment:
   Could we define a constant in `ConfigResource` that includes all of the 
types so that someone adding a type will not risk missing changing this code?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2874,16 +2874,61 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
-    val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
+  private def handleListConfigResources(request: RequestChannel.Request): Unit 
= {
+    val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
 
     if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
-      requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+      requestHelper.sendMaybeThrottle(request, 
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
-        clientMetricsManager.listClientMetricsResources.stream.map(
-          name => new 
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
-      requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+      val data = new ListConfigResourcesResponseData()
+
+      if (request.header.apiVersion() == 0) {

Review Comment:
   I wonder whether this version-specific behaviour would be better put into 
`ListConfigResourcesRequest.java` and `ListConfigResourcesResponse.java` as 
appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to