lianetm commented on code in PR #19461:
URL: https://github.com/apache/kafka/pull/19461#discussion_r2045463288


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -45,20 +45,24 @@ public static class Builder extends 
AbstractRequest.Builder<OffsetCommitRequest>
 
         private final OffsetCommitRequestData data;
 
-        public Builder(OffsetCommitRequestData data, boolean 
enableUnstableLastVersion) {
-            super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion);
+        private Builder(OffsetCommitRequestData data, short 
oldestAllowedVersion, short latestAllowedVersion) {
+            super(ApiKeys.OFFSET_COMMIT, oldestAllowedVersion, 
latestAllowedVersion);
             this.data = data;
         }
 
-        public Builder(OffsetCommitRequestData data) {
-            this(data, false);
+        public static Builder forTopicIdsAndNames(OffsetCommitRequestData 
data, boolean enableUnstableLastVersion) {

Review Comment:
   nit: should this be `forTopicIdsOrNames`?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -271,40 +271,62 @@ class KafkaApis(val requestChannel: RequestChannel,
   ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    // Reject the request if not authorized to the group
+    // Reject the request if not authorized to the group.
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val useTopicIds = 
OffsetCommitResponse.useTopicIds(request.header.apiVersion)
+
+      if (useTopicIds) {
+        offsetCommitRequest.data.topics.forEach { topic =>
+          if (topic.topicId != Uuid.ZERO_UUID) {
+            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
+          }
+        }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
         TOPIC,
         offsetCommitRequest.data.topics.asScala
       )(_.name)
 
-      val responseBuilder = new OffsetCommitResponse.Builder()
+      val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds)
       val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
       offsetCommitRequest.data.topics.forEach { topic =>
-        if (!authorizedTopics.contains(topic.name)) {
+        if (useTopicIds && topic.name.isEmpty) {
+          // If the topic name is undefined, it means that the topic id is 
unknown so we add
+          // the topic and all its partitions to the response with 
UNKNOWN_TOPIC_ID.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_ID)
+        } else if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
           
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review Comment:
   if we're on version >=10 and include topic.name here, wouldn't we end up 
leaking a topic name in the reponse for which the user is not authorized? (and 
that wasn't included in the request). 



##########
clients/src/main/resources/common/message/OffsetCommitResponse.json:
##########
@@ -34,7 +34,9 @@
   // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The response is
   // the same as version 8 but can return STALE_MEMBER_EPOCH when the new 
consumer group protocol is used and
   // GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
-  "validVersions": "2-9",
+  //
+  // Version 10 adds support for topic ids (KIP-848).

Review Comment:
   ditto



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1308,6 +1308,75 @@ public void testConsumerGroupOffsetCommit() {
         );
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithTopicIds() {
+        Uuid topicId = Uuid.randomUuid();
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        // Add member.
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .build()
+        );
+
+        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result 
= context.commitOffset(
+            new OffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationIdOrMemberEpoch(10)
+                .setTopics(List.of(
+                    new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                        .setTopicId(topicId)
+                        .setName("bar")

Review Comment:
   Just to check, in practice I would expect that having both is something that 
will never happen with the current approach (the request will either contain 
name if v<10, or topic Id if v>=10). Is my understanding correct?



##########
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##########
@@ -36,8 +36,11 @@
   //
   // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The
   // request is the same as version 8.
-  "validVersions": "2-9",
+  //
+  // Version 10 adds support for topic ids (KIP-848).

Review Comment:
   should we clarify that it also removes support for topic names? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to