rondagostino commented on code in PR #13116: URL: https://github.com/apache/kafka/pull/13116#discussion_r1088113461
########## core/src/main/scala/kafka/server/ControllerApis.scala: ########## @@ -392,14 +394,36 @@ class ControllerApis(val requestChannel: RequestChannel, val describableTopicNames = getDescribableTopics.apply(allowedTopicNames).asJava val effectiveRequest = request.duplicate() val iterator = effectiveRequest.topics().iterator() + var totalRequestedPartitionCount = 0 + val defaultPartitionCont = config.numPartitions.intValue() while (iterator.hasNext) { val creatableTopic = iterator.next() if (duplicateTopicNames.contains(creatableTopic.name()) || !authorizedTopicNames.contains(creatableTopic.name())) { iterator.remove() - } - } - controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response => + } else { + if (!creatableTopic.assignments().isEmpty) { + totalRequestedPartitionCount += creatableTopic.assignments().size() + } else if (creatableTopic.numPartitions() > 0) + totalRequestedPartitionCount += creatableTopic.numPartitions() + else + totalRequestedPartitionCount += defaultPartitionCont + } + } + val future = try { + if (!effectiveRequest.validateOnly()) + controllerMutationQuota.record(totalRequestedPartitionCount) + controller.createTopics(context, effectiveRequest, describableTopicNames) + } catch { + case e: ThrottlingQuotaExceededException => + val apiError = ApiError.fromThrowable(e) + val data = new CreateTopicsResponseData + effectiveRequest.topics().forEach(topic => + data.topics.add(new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name).setErrorCode(apiError.error.code).setErrorMessage(apiError.message))) + data.setThrottleTimeMs(e.throttleTimeMs()) + CompletableFuture.completedFuture(data) + } Review Comment: I actually believe the code is correct as written. We mark all of the requested topics in `effectiveRequest` with the throttle exception, and then we fallthrough to mark all of the topics that were not in the effective request with the appropriate error codes (duplicate topics get marked with invalid request and unauthorized topics get marked with unauthorized request). Clearly this is not clear though! I'll add a comment describing this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org