FrankYang0529 commented on code in PR #19461: URL: https://github.com/apache/kafka/pull/19461#discussion_r2044060234
########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -190,12 +203,85 @@ public Builder merge( } }); } - return this; } public OffsetCommitResponse build() { return new OffsetCommitResponse(data); } } + + public static class TopicIdBuilder extends Builder { + private final HashMap<Uuid, OffsetCommitResponseTopic> byTopicId = new HashMap<>(); + + @Override + protected void add(OffsetCommitResponseTopic topic) { + throwIfTopicIdIsNull(topic.topicId()); + data.topics().add(topic); + byTopicId.put(topic.topicId(), topic); + } + + @Override + protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) { + throwIfTopicIdIsNull(topicId); + return byTopicId.get(topicId); + } + + @Override + protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) { + throwIfTopicIdIsNull(topicId); + OffsetCommitResponseTopic topic = byTopicId.get(topicId); + if (topic == null) { + topic = new OffsetCommitResponseTopic() + .setName(topicName) + .setTopicId(topicId); + data.topics().add(topic); + byTopicId.put(topicId, topic); + } + return topic; + } + + private static void throwIfTopicIdIsNull(Uuid topicId) { Review Comment: Not sure whether `throwIfTopicIdIsNull` and `throwIfTopicNameIsNull` is redundant. When we build `OffsetCommitResponseTopic`, the `topicId` and `topicName` data are from `OffsetCommitRequestTopic`. In the generated code, the default name is `""` and default `topicId` is `Uuid.ZERO_UUID`. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -271,40 +271,62 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] - // Reject the request if not authorized to the group + // Reject the request if not authorized to the group. if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val useTopicIds = OffsetCommitResponse.useTopicIds(request.header.apiVersion) + + if (useTopicIds) { + offsetCommitRequest.data.topics.forEach { topic => + if (topic.topicId != Uuid.ZERO_UUID) { + metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) + } + } + } + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, TOPIC, offsetCommitRequest.data.topics.asScala )(_.name) - val responseBuilder = new OffsetCommitResponse.Builder() + val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds) val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => - if (!authorizedTopics.contains(topic.name)) { + if (useTopicIds && topic.name.isEmpty) { Review Comment: Question: why we check topic name is empty when `useTopicIds` is true? Does it implicit mean that topic id is null or `ZERO_UUID` when topic name is empty? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org