josefk31 commented on code in PR #17604:
URL: https://github.com/apache/kafka/pull/17604#discussion_r1819789470


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1136,6 +1139,34 @@ ControllerResult<AlterPartitionResponseData> 
alterPartition(
         return ControllerResult.of(records, response);
     }
 
+    /**
+     * Validates that a batch of topics will create less than {@value 
MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
+     * has led to out-of-memory exceptions. We use this validation to fail 
earlier to avoid allocating the memory.
+     * This validation assumes that all the topics in the batch are valid and 
can be created.
+     *
+     * @param request a batch of topics to create.
+     * @param defaultNumPartitions default number of partitions to assign if 
unspecified.
+     * @throws PolicyViolationException if total number of partitions exceeds 
{@value MAX_PARTITIONS_PER_BATCH}.
+     */
+    static void 
validateEstimatedTotalNumberOfPartitions(CreateTopicsRequestData request, int 
defaultNumPartitions) {
+        int totalPartitions = 0;
+        for (CreatableTopic topic: request.topics()) {
+            if (topic.assignments().isEmpty()) {
+                if (topic.numPartitions() == -1) {
+                    totalPartitions += defaultNumPartitions;
+                } else if (topic.numPartitions() > 0) {
+                    totalPartitions += topic.numPartitions();
+                }
+            } else {
+                totalPartitions += topic.assignments().size();

Review Comment:
   Reread the code so have a better understanding now. Outlining my reasoning 
here:
   
   There are two ways to create topics:
   1. Manually assign partitions and replicas for topics (`assignments`)
   2. Specify a number of partitions and have Kafka automatically derive 
replications.
   
   We don't actually know how many topics (and partitions) will be created 
until running various validations in each `createTopic` call. The most recent 
code calculates the maximum number of partitions which could be created and 
throws exception if the request goes above the limit. I think this is fine 
because the user intent is to create a batch with too many partitions. We add 
an edge case if we exclude misconfigured or invalid partitions from the limit...
   
   IUUC, each `assignment` can be treated as creating `1` partition so we can 
use the size of this collection as counting towards the limit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to