jolshan commented on a change in pull request #9473:
URL: https://github.com/apache/kafka/pull/9473#discussion_r512056578



##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -481,43 +482,70 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     pathExists(TopicZNode.path(topicName))
   }
 
+  /**
+   * Adds a topic ID to existing topic and replica assignments
+   * @param topicIdReplicaAssignments the TopicIDReplicaAssignments to add a 
topic ID to
+   * @return the updated TopicIdReplicaAssigments including the newly created 
topic IDs
+   */
+  def setTopicIds(topicIdReplicaAssignments: 
collection.Set[TopicIdReplicaAssignment],
+                  expectedControllerEpochZkVersion: Int): 
Set[TopicIdReplicaAssignment] = {
+    val updatedAssignments = topicIdReplicaAssignments.map {
+      case TopicIdReplicaAssignment(topic, None, assignments) =>
+        TopicIdReplicaAssignment(topic, Some(UUID.randomUUID()), assignments)
+      case TopicIdReplicaAssignment(topic, Some(_), _) =>
+        throw new IllegalArgumentException("TopicIdReplicaAssignment for " + 
topic + " already contains a topic ID.")
+    }.toSet
+
+    val setDataRequests = updatedAssignments.map { case 
TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
+      SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicIdOpt.get, 
assignments), ZkVersion.MatchAnyVersion)
+    }.toSeq
+
+    retryRequestsUntilConnected(setDataRequests, 
expectedControllerEpochZkVersion)
+    updatedAssignments
+  }
+
   /**
    * Sets the topic znode with the given assignment.
    * @param topic the topic whose assignment is being set.
+   * @param topicId optional topic ID if the topic has one
    * @param assignment the partition to replica mapping to set for the given 
topic
    * @param expectedControllerEpochZkVersion expected controller epoch 
zkVersion.
    * @return SetDataResponse
    */
   def setTopicAssignmentRaw(topic: String,
+                            topicId: UUID,
                             assignment: collection.Map[TopicPartition, 
ReplicaAssignment],
                             expectedControllerEpochZkVersion: Int): 
SetDataResponse = {
-    val setDataRequest = SetDataRequest(TopicZNode.path(topic), 
TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion)
+    val setDataRequest = SetDataRequest(TopicZNode.path(topic), 
TopicZNode.encode(topicId, assignment), ZkVersion.MatchAnyVersion)
     retryRequestUntilConnected(setDataRequest, 
expectedControllerEpochZkVersion)
   }
 
   /**
    * Sets the topic znode with the given assignment.
    * @param topic the topic whose assignment is being set.
+   * @param topicId optional topic ID if the topic has one

Review comment:
       I meant to fix this. Thanks for catching.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to