cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1084419497


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -392,14 +394,36 @@ class ControllerApis(val requestChannel: RequestChannel,
     val describableTopicNames = 
getDescribableTopics.apply(allowedTopicNames).asJava
     val effectiveRequest = request.duplicate()
     val iterator = effectiveRequest.topics().iterator()
+    var totalRequestedPartitionCount = 0
+    val defaultPartitionCont = config.numPartitions.intValue()
     while (iterator.hasNext) {
       val creatableTopic = iterator.next()
       if (duplicateTopicNames.contains(creatableTopic.name()) ||
           !authorizedTopicNames.contains(creatableTopic.name())) {
         iterator.remove()
-      }
-    }
-    controller.createTopics(context, effectiveRequest, 
describableTopicNames).thenApply { response =>
+      } else {
+        if (!creatableTopic.assignments().isEmpty) {
+          totalRequestedPartitionCount += creatableTopic.assignments().size()
+        } else if (creatableTopic.numPartitions() > 0)
+          totalRequestedPartitionCount += creatableTopic.numPartitions()
+        else
+          totalRequestedPartitionCount += defaultPartitionCont
+      }
+    }
+    val future = try {
+      if (!effectiveRequest.validateOnly())
+        controllerMutationQuota.record(totalRequestedPartitionCount)
+      controller.createTopics(context, effectiveRequest, describableTopicNames)
+    } catch {
+      case e: ThrottlingQuotaExceededException =>
+        val apiError = ApiError.fromThrowable(e)
+        val data = new CreateTopicsResponseData
+        effectiveRequest.topics().forEach(topic =>
+          data.topics.add(new 
CreateTopicsResponseData.CreatableTopicResult().setName(topic.name).setErrorCode(apiError.error.code).setErrorMessage(apiError.message)))
+        data.setThrottleTimeMs(e.throttleTimeMs())
+        CompletableFuture.completedFuture(data)
+    }

Review Comment:
   I don't think you need to catch this here, since we have a catch block on 
the request handler that will deal with "send back error X to response Y"
   
   If you do have a catch block here you'd certainly need a return statement as 
well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to