josefk31 commented on code in PR #17604: URL: https://github.com/apache/kafka/pull/17604#discussion_r1819789470
########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1136,6 +1139,34 @@ ControllerResult<AlterPartitionResponseData> alterPartition( return ControllerResult.of(records, response); } + /** + * Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch + * has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory. + * This validation assumes that all the topics in the batch are valid and can be created. + * + * @param request a batch of topics to create. + * @param defaultNumPartitions default number of partitions to assign if unspecified. + * @throws PolicyViolationException if total number of partitions exceeds {@value MAX_PARTITIONS_PER_BATCH}. + */ + static void validateEstimatedTotalNumberOfPartitions(CreateTopicsRequestData request, int defaultNumPartitions) { + int totalPartitions = 0; + for (CreatableTopic topic: request.topics()) { + if (topic.assignments().isEmpty()) { + if (topic.numPartitions() == -1) { + totalPartitions += defaultNumPartitions; + } else if (topic.numPartitions() > 0) { + totalPartitions += topic.numPartitions(); + } + } else { + totalPartitions += topic.assignments().size(); Review Comment: Reread the code so have a better understanding now. Outlining my reasoning here: There are two ways to create topics: 1. Manually assign partitions and replicas for topics (`assignments`) 2. Specify a number of partitions and have Kafka automatically derive replications. We don't actually know how many topics (and partitions) will be created until running various validations in each `createTopic` call. The most recent code calculates the maximum number of partitions which could be created and throws exception if the request goes above the limit. I think this is fine because the user intent is to create a batch with too many partitions. We add an edge case if we exclude misconfigured or invalid partitions from the limit... IUUC, each `assignment` can be treated as creating `1` partition so we can use the size of this collection as counting towards the limit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org