jolshan commented on a change in pull request #9473: URL: https://github.com/apache/kafka/pull/9473#discussion_r512056578
########## File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala ########## @@ -481,43 +482,70 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo pathExists(TopicZNode.path(topicName)) } + /** + * Adds a topic ID to existing topic and replica assignments + * @param topicIdReplicaAssignments the TopicIDReplicaAssignments to add a topic ID to + * @return the updated TopicIdReplicaAssigments including the newly created topic IDs + */ + def setTopicIds(topicIdReplicaAssignments: collection.Set[TopicIdReplicaAssignment], + expectedControllerEpochZkVersion: Int): Set[TopicIdReplicaAssignment] = { + val updatedAssignments = topicIdReplicaAssignments.map { + case TopicIdReplicaAssignment(topic, None, assignments) => + TopicIdReplicaAssignment(topic, Some(UUID.randomUUID()), assignments) + case TopicIdReplicaAssignment(topic, Some(_), _) => + throw new IllegalArgumentException("TopicIdReplicaAssignment for " + topic + " already contains a topic ID.") + }.toSet + + val setDataRequests = updatedAssignments.map { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) => + SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicIdOpt.get, assignments), ZkVersion.MatchAnyVersion) + }.toSeq + + retryRequestsUntilConnected(setDataRequests, expectedControllerEpochZkVersion) + updatedAssignments + } + /** * Sets the topic znode with the given assignment. * @param topic the topic whose assignment is being set. + * @param topicId optional topic ID if the topic has one * @param assignment the partition to replica mapping to set for the given topic * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @return SetDataResponse */ def setTopicAssignmentRaw(topic: String, + topicId: UUID, assignment: collection.Map[TopicPartition, ReplicaAssignment], expectedControllerEpochZkVersion: Int): SetDataResponse = { - val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion) + val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicId, assignment), ZkVersion.MatchAnyVersion) retryRequestUntilConnected(setDataRequest, expectedControllerEpochZkVersion) } /** * Sets the topic znode with the given assignment. * @param topic the topic whose assignment is being set. + * @param topicId optional topic ID if the topic has one Review comment: I meant to fix this. Thanks for catching. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org