jolshan commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1054819922
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle an offset commit request */ - def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { - val header = request.header + def handleOffsetCommitRequest( + request: RequestChannel.Request, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - // the callback for sending an offset commit response - def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors - if (isDebugEnabled) - combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { - debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") - } - } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) - } - - // reject the request if not authorized to the group + // Reject the request if not authorized to the group if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( - offsetCommitRequest.data.topics, - error) - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( - new OffsetCommitResponseData() - .setTopics(responseTopicList) - .setThrottleTimeMs(requestThrottleMs) - )) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - val errorMap = new mutable.HashMap[TopicPartition, Errors] - for (topicData <- offsetCommitRequest.data.topics.asScala) { - for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION - } - } - sendResponseCallback(errorMap.toMap) + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) } else { - val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition] + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + offsetCommitRequest.data.topics.asScala + )(_.name) + + val responseBuilder = new OffsetCommitResponse.Builder() + val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => + if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) + } else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } else { + // Otherwise, we check all partitions to ensure that they all exist. + val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name) - val topics = offsetCommitRequest.data.topics.asScala - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name) - for (topicData <- topics) { - for (partitionData <- topicData.partitions.asScala) { - val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex) - if (!authorizedTopics.contains(topicData.name)) - unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION) - else - authorizedTopicRequestInfoBldr += (topicPartition -> partitionData) + topic.partitions.forEach { partition => + if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { Review Comment: more of a comment than something that needs changing -- it is interesting we check the metadata cache twice -- once to confirm topic existence and then again to confirm partition info existence. Probably not worth changing in this PR though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org