AndrewJSchofield commented on code in PR #19493: URL: https://github.com/apache/kafka/pull/19493#discussion_r2079364838
########## clients/src/main/resources/common/message/ListConfigResourcesResponse.json: ########## @@ -16,18 +16,22 @@ { "apiKey": 74, "type": "response", - "name": "ListClientMetricsResourcesResponse", - "validVersions": "0", + "name": "ListConfigResourcesResponse", + // Version 0 is used as ListClientMetricsResourcesResponse which returns all client metrics resources. + // Version 1 adds ResourceType to ConfigResources (KIP-1142). + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, - { "name": "ClientMetricsResources", "type": "[]ClientMetricsResource", "versions": "0+", - "about": "Each client metrics resource in the response.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", - "about": "The resource name." } + { "name": "ConfigResources", "type": "[]ConfigResource", "versions": "0+", + "about": "Each config resource in the response.", "fields": [ + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The resource name." }, + { "name": "ResourceType", "type": "int8", "versions": "1+", "ignorable": true, Review Comment: Should this be `"default": "16"`? I'm still researching this, so that's an open question. ########## clients/src/main/resources/common/message/ListConfigResourcesRequest.json: ########## @@ -17,10 +17,14 @@ "apiKey": 74, "type": "request", "listeners": ["broker"], - "name": "ListClientMetricsResourcesRequest", - "validVersions": "0", + "name": "ListConfigResourcesRequest", + // Version 0 is used as ListClientMetricsResourcesRequest which only lists client metrics resources. + // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified ResourceTypes, it should return all configuration resources. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ + { "name": "ResourceTypes", "type": "[]int8", "versions": "1+", "ignorable": true, Review Comment: I don't see the value of `ignorable` here. If it's v0, the default is essentially an array containing `ConfigResource.CLIENT_METRICS`. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2874,16 +2874,61 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { - val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] + private def handleListConfigResources(request: RequestChannel.Request): Unit = { + val listConfigResourcesRequest = request.body[ListConfigResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { - requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) + requestHelper.sendMaybeThrottle(request, listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( - clientMetricsManager.listClientMetricsResources.stream.map( - name => new ClientMetricsResource().setName(name)).collect(Collectors.toList())) - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + val data = new ListConfigResourcesResponseData() + + if (request.header.apiVersion() == 0) { + // Version 0 only supports client metrics. + data.setConfigResources(clientMetricsManager.listClientMetricsResources.stream.map(name => + new ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id) + ).collect(Collectors.toList())) + } else { + // From version 1, supports all ConfigResource.Type. + var resourceTypes = listConfigResourcesRequest.data().resourceTypes() + if (resourceTypes.isEmpty) { + resourceTypes = util.List.of( Review Comment: Could we define a constant in `ConfigResource` that includes all of the types so that someone adding a type will not risk missing changing this code? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2874,16 +2874,61 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { - val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] + private def handleListConfigResources(request: RequestChannel.Request): Unit = { + val listConfigResourcesRequest = request.body[ListConfigResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { - requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) + requestHelper.sendMaybeThrottle(request, listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( - clientMetricsManager.listClientMetricsResources.stream.map( - name => new ClientMetricsResource().setName(name)).collect(Collectors.toList())) - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + val data = new ListConfigResourcesResponseData() + + if (request.header.apiVersion() == 0) { Review Comment: I wonder whether this version-specific behaviour would be better put into `ListConfigResourcesRequest.java` and `ListConfigResourcesResponse.java` as appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org