rondagostino commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1088113461


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -392,14 +394,36 @@ class ControllerApis(val requestChannel: RequestChannel,
     val describableTopicNames = 
getDescribableTopics.apply(allowedTopicNames).asJava
     val effectiveRequest = request.duplicate()
     val iterator = effectiveRequest.topics().iterator()
+    var totalRequestedPartitionCount = 0
+    val defaultPartitionCont = config.numPartitions.intValue()
     while (iterator.hasNext) {
       val creatableTopic = iterator.next()
       if (duplicateTopicNames.contains(creatableTopic.name()) ||
           !authorizedTopicNames.contains(creatableTopic.name())) {
         iterator.remove()
-      }
-    }
-    controller.createTopics(context, effectiveRequest, 
describableTopicNames).thenApply { response =>
+      } else {
+        if (!creatableTopic.assignments().isEmpty) {
+          totalRequestedPartitionCount += creatableTopic.assignments().size()
+        } else if (creatableTopic.numPartitions() > 0)
+          totalRequestedPartitionCount += creatableTopic.numPartitions()
+        else
+          totalRequestedPartitionCount += defaultPartitionCont
+      }
+    }
+    val future = try {
+      if (!effectiveRequest.validateOnly())
+        controllerMutationQuota.record(totalRequestedPartitionCount)
+      controller.createTopics(context, effectiveRequest, describableTopicNames)
+    } catch {
+      case e: ThrottlingQuotaExceededException =>
+        val apiError = ApiError.fromThrowable(e)
+        val data = new CreateTopicsResponseData
+        effectiveRequest.topics().forEach(topic =>
+          data.topics.add(new 
CreateTopicsResponseData.CreatableTopicResult().setName(topic.name).setErrorCode(apiError.error.code).setErrorMessage(apiError.message)))
+        data.setThrottleTimeMs(e.throttleTimeMs())
+        CompletableFuture.completedFuture(data)
+    }

Review Comment:
   I actually believe the code is correct as written.  We mark all of the 
requested topics in `effectiveRequest` with the throttle exception, and then we 
fallthrough to mark all of the topics that were not in the effective request 
with the appropriate error codes (duplicate topics get marked with invalid 
request and unauthorized topics get marked with unauthorized request).  Clearly 
this is not clear though!  I'll add a comment describing this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to