CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448300548
##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
}
}
+ /**
+ * Get the topic metadata for the given topics.
+ *
+ * The quota is used to limit the number of partitions to return. The
NextTopicPartition field points to the first
+ * partition can't be returned due the limit.
+ * If a topic can't return any partition due to quota limit reached, this
topic will not be included in the response.
+ *
+ * Note, the topics should be sorted in alphabetical order. The topics in
the DescribeTopicPartitionsResponseData
+ * will also be sorted in alphabetical order.
+ *
+ * @param topics The set of topics and their
corresponding first partition id to fetch.
+ * @param listenerName The listener name.
+ * @param firstTopicPartitionStartIndex The start partition index for the
first topic
+ * @param maximumNumberOfPartitions The max number of partitions to
return.
+ */
+ def getTopicMetadataForDescribeTopicResponse(
+ topics: Seq[String],
+ listenerName: ListenerName,
+ firstTopicPartitionStartIndex: Int,
+ maximumNumberOfPartitions: Int
+ ): DescribeTopicPartitionsResponseData = {
+ val image = _currentImage
+ var remaining = maximumNumberOfPartitions
+ var startIndex = firstTopicPartitionStartIndex
+ val result = new DescribeTopicPartitionsResponseData()
+ topics.foreach { topicName =>
+ if (remaining > 0) {
+ val partitionResponse =
getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName)
+ partitionResponse.map( partitions => {
+ val upperIndex = startIndex + remaining
+ val response = new DescribeTopicPartitionsResponseTopic()
+ .setErrorCode(Errors.NONE.code)
+ .setName(topicName)
+
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+ .setIsInternal(Topic.isInternal(topicName))
+ .setPartitions(partitions.filter(partition => {
+ partition.partitionIndex() >= startIndex &&
partition.partitionIndex() < upperIndex
+ }).asJava)
+ remaining -= response.partitions().size()
+ result.topics().add(response)
+
+ if (upperIndex < partitions.size) {
+ result.setNextCursor(new Cursor()
+ .setTopicName(topicName)
+ .setPartitionIndex(upperIndex)
+ )
+ remaining = -1
+ }
+ })
+
+ // start index only applies to the first topic. Reset it here.
+ startIndex = 0
+
+ if (!partitionResponse.isDefined) {
+ val error = try {
+ Topic.validate(topicName)
+ Errors.UNKNOWN_TOPIC_OR_PARTITION
Review Comment:
Yes, the cursor topic can be deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]