cmccabe commented on code in PR #13116: URL: https://github.com/apache/kafka/pull/13116#discussion_r1084419497
########## core/src/main/scala/kafka/server/ControllerApis.scala: ########## @@ -392,14 +394,36 @@ class ControllerApis(val requestChannel: RequestChannel, val describableTopicNames = getDescribableTopics.apply(allowedTopicNames).asJava val effectiveRequest = request.duplicate() val iterator = effectiveRequest.topics().iterator() + var totalRequestedPartitionCount = 0 + val defaultPartitionCont = config.numPartitions.intValue() while (iterator.hasNext) { val creatableTopic = iterator.next() if (duplicateTopicNames.contains(creatableTopic.name()) || !authorizedTopicNames.contains(creatableTopic.name())) { iterator.remove() - } - } - controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response => + } else { + if (!creatableTopic.assignments().isEmpty) { + totalRequestedPartitionCount += creatableTopic.assignments().size() + } else if (creatableTopic.numPartitions() > 0) + totalRequestedPartitionCount += creatableTopic.numPartitions() + else + totalRequestedPartitionCount += defaultPartitionCont + } + } + val future = try { + if (!effectiveRequest.validateOnly()) + controllerMutationQuota.record(totalRequestedPartitionCount) + controller.createTopics(context, effectiveRequest, describableTopicNames) + } catch { + case e: ThrottlingQuotaExceededException => + val apiError = ApiError.fromThrowable(e) + val data = new CreateTopicsResponseData + effectiveRequest.topics().forEach(topic => + data.topics.add(new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name).setErrorCode(apiError.error.code).setErrorMessage(apiError.message))) + data.setThrottleTimeMs(e.throttleTimeMs()) + CompletableFuture.completedFuture(data) + } Review Comment: I don't think you need to catch this here, since we have a catch block on the request handler that will deal with "send back error X to response Y" If you do have a catch block here you'd certainly need a return statement as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org