dajac commented on code in PR #19461:
URL: https://github.com/apache/kafka/pull/19461#discussion_r2047359577


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -271,40 +271,62 @@ class KafkaApis(val requestChannel: RequestChannel,
   ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    // Reject the request if not authorized to the group
+    // Reject the request if not authorized to the group.
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val useTopicIds = 
OffsetCommitResponse.useTopicIds(request.header.apiVersion)
+
+      if (useTopicIds) {
+        offsetCommitRequest.data.topics.forEach { topic =>
+          if (topic.topicId != Uuid.ZERO_UUID) {
+            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
+          }
+        }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
         TOPIC,
         offsetCommitRequest.data.topics.asScala
       )(_.name)
 
-      val responseBuilder = new OffsetCommitResponse.Builder()
+      val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds)
       val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
       offsetCommitRequest.data.topics.forEach { topic =>
-        if (!authorizedTopics.contains(topic.name)) {
+        if (useTopicIds && topic.name.isEmpty) {
+          // If the topic name is undefined, it means that the topic id is 
unknown so we add
+          // the topic and all its partitions to the response with 
UNKNOWN_TOPIC_ID.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_ID)
+        } else if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
           
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review Comment:
   No because the topic name is not serialized in the response when version >= 
10. We can basically rely on the response serialization logic to pick up the id 
or the name depending on the version.



##########
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##########
@@ -36,8 +36,11 @@
   //
   // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The
   // request is the same as version 8.
-  "validVersions": "2-9",
+  //
+  // Version 10 adds support for topic ids (KIP-848).

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to