CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1400015238


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel,
       ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+    metadataCache match {
+      case _: ZkMetadataCache =>
+        throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+      case _ =>
+    }
+    val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+    val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+    val topicSet = mutable.SortedSet[String]()
+    describeTopicPartitionsRequest.topics().forEach(topic => 
topicSet.add(topic.name()))
+    val topics = ListBuffer[String]().addAll(topicSet)
+
+    val cursor = describeTopicPartitionsRequest.cursor()
+    val fetchAllTopics = topics.isEmpty
+    if (fetchAllTopics) {
+      kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic))

Review Comment:
   In the fetch all path, no additional sort is required. I did not see a good 
way to convert Java list to a scala mutable list, so I did the copy.
   Use a mutable list for 2 reasons
   1. It is easier to filter out the topics alphabetically ahead of the cursor 
topic
   2. In the fetch all case, I think we should still include the cursor topic 
in the response if it does not exist. Mutable list make it easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to