dajac commented on code in PR #19461: URL: https://github.com/apache/kafka/pull/19461#discussion_r2047359577
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -271,40 +271,62 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] - // Reject the request if not authorized to the group + // Reject the request if not authorized to the group. if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val useTopicIds = OffsetCommitResponse.useTopicIds(request.header.apiVersion) + + if (useTopicIds) { + offsetCommitRequest.data.topics.forEach { topic => + if (topic.topicId != Uuid.ZERO_UUID) { + metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) + } + } + } + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, TOPIC, offsetCommitRequest.data.topics.asScala )(_.name) - val responseBuilder = new OffsetCommitResponse.Builder() + val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds) val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => - if (!authorizedTopics.contains(topic.name)) { + if (useTopicIds && topic.name.isEmpty) { + // If the topic name is undefined, it means that the topic id is unknown so we add + // the topic and all its partitions to the response with UNKNOWN_TOPIC_ID. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_ID) + } else if (!authorizedTopics.contains(topic.name)) { // If the topic is not authorized, we add the topic and all its partitions // to the response with TOPIC_AUTHORIZATION_FAILED. responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( - topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) Review Comment: No because the topic name is not serialized in the response when version >= 10. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org