jolshan commented on a change in pull request #9473: URL: https://github.com/apache/kafka/pull/9473#discussion_r512068125
########## File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala ########## @@ -577,6 +605,27 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo retryRequestsUntilConnected(deleteRequests, expectedControllerEpochZkVersion) } + /** + * Gets the topic IDs for the given topics. + * @param topics the topics we wish to retrieve the Topic IDs for + * @return the Topic IDs + */ + def getTopicIdsForTopics(topics: Set[String]): Map[String, UUID] = { + val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic))) + val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq) + getDataResponses.map { getDataResponse => + val topic = getDataResponse.ctx.get.asInstanceOf[String] + getDataResponse.resultCode match { + case Code.OK => Some(TopicZNode.decode(topic, getDataResponse.data)) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get + } + }.filter(_.flatMap(_.topicId).isDefined) Review comment: In the case where the topic was created on an older version (where there are no topic IDs yet, we will have the case where topic IDs are not defined. However, I believe in the case where this is used, we should have topic IDs defined. (I'm expecting a topic ID on the following line, so an error would occur there if it was missing.) I'm thinking it would make sense to remove the filter line and maybe throw an error here (earlier) if it is not set. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org