dajac commented on code in PR #19461:
URL: https://github.com/apache/kafka/pull/19461#discussion_r2047359577


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -271,40 +271,62 @@ class KafkaApis(val requestChannel: RequestChannel,
   ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    // Reject the request if not authorized to the group
+    // Reject the request if not authorized to the group.
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val useTopicIds = 
OffsetCommitResponse.useTopicIds(request.header.apiVersion)
+
+      if (useTopicIds) {
+        offsetCommitRequest.data.topics.forEach { topic =>
+          if (topic.topicId != Uuid.ZERO_UUID) {
+            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
+          }
+        }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
         TOPIC,
         offsetCommitRequest.data.topics.asScala
       )(_.name)
 
-      val responseBuilder = new OffsetCommitResponse.Builder()
+      val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds)
       val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
       offsetCommitRequest.data.topics.forEach { topic =>
-        if (!authorizedTopics.contains(topic.name)) {
+        if (useTopicIds && topic.name.isEmpty) {
+          // If the topic name is undefined, it means that the topic id is 
unknown so we add
+          // the topic and all its partitions to the response with 
UNKNOWN_TOPIC_ID.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_ID)
+        } else if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
           
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review Comment:
   No because the topic name is not serialized in the response when version >= 
10.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to