CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1400015238
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { + metadataCache match { + case _: ZkMetadataCache => + throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => + } + val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + + val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() + val topicSet = mutable.SortedSet[String]() + describeTopicPartitionsRequest.topics().forEach(topic => topicSet.add(topic.name())) + val topics = ListBuffer[String]().addAll(topicSet) + + val cursor = describeTopicPartitionsRequest.cursor() + val fetchAllTopics = topics.isEmpty + if (fetchAllTopics) { + kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic)) Review Comment: In the fetch all path, no additional sort is required. I did not see a good way to convert Java list to a scala mutable list, so I did the copy. Use a mutable list for 2 reasons 1. It is easier to filter out the topics alphabetically ahead of the cursor topic 2. In the fetch all case, I think we should still include the cursor topic in the response if it does not exist. Mutable list make it easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org