akhileshchg commented on code in PR #12815:
URL: https://github.com/apache/kafka/pull/12815#discussion_r1024674845


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1571,6 +1638,10 @@ private void resetToEmptyState() {
      */
     private final FeatureControlManager featureControl;
 
+    private final KRaftMetadataListener listener;
+
+    public final MigrationListener migrationListener = new 
MigrationListener(); // TODO clean this up

Review Comment:
   These two names get confusing quickly. In my understanding, 
`KRaftMetadataListener` is for replicating data consistently to Zookeeper from 
the KRaft metadata log and `MigrationListener` is for migrating Zookepeer data 
to KRaft Metadata log. 
   
   Can we rename `migrationListener` to `kraftToZkMigrationHandler` or 
something to that effect?
   and `listener` to zkToKRaftMigrationHandler`?



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -156,6 +157,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     tryCreateControllerZNodeAndIncrementEpoch()
   }
 
+  /**
+   * Registers a given KRaft controller in zookeeper as the active controller. 
Unlike the ZK equivalent of this method,
+   * this creates /controller as a persistent znode. This prevents ZK brokers 
from attempting to claim the controller
+   * leadership during a KRaft leadership failover.
+   *
+   * This method is called at the beginning of a KRaft migration and during 
subsequent KRaft leadership changes during
+   * the migration.
+   *
+   * To ensure that the KRaft controller epoch proceeds the current ZK 
controller epoch, this registration algorithm
+   * uses a conditional update on the /controller_epoch znode. If a new ZK 
controller is elected during this method,
+   * the multi request transaction will fail and this method will return None.
+   *
+   * @param kraftControllerId ID of the KRaft controller node
+   * @param kraftControllerEpoch Epoch of the KRaft controller node
+   * @return An optional of the new zkVersion of /controller_epoch. None if we 
could not register the KRaft controller.
+   */
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): Option[Int] = {

Review Comment:
   Just a design consideration, do you think this must be part of the 
MigrationClient instead?



##########
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##########
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, 
LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, 
VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): 
ZkBrokerRegistration = {
+      val registration = new BrokerRegistration(broker.id, epoch, 
Uuid.ZERO_UUID,

Review Comment:
   Just wondering, do we use the incarnation id for anything?



##########
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##########
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, 
LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, 
VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): 
ZkBrokerRegistration = {
+      val registration = new BrokerRegistration(broker.id, epoch, 
Uuid.ZERO_UUID,
+        Collections.emptyList[Endpoint], Collections.emptyMap[String, 
VersionRange],
+        Optional.empty(), false, false)
+      new ZkBrokerRegistration(registration, null, null, false)
+  }
+}
+
+class ZkMigrationClient(zkClient: KafkaZkClient,
+                        controllerChannelManager: ControllerChannelManager) 
extends MigrationClient with Logging {
+
+  def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: 
Int): ZkControllerState = {
+    val epochZkVersionOpt = 
zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, 
kraftControllerEpoch)
+    if (epochZkVersionOpt.isDefined) {
+      new ZkControllerState(kraftControllerId, kraftControllerEpoch, 
epochZkVersionOpt.get)
+    } else {
+      throw new ControllerMovedException("Cannot claim controller leadership, 
the controller has moved.")
+    }
+  }
+
+  def migrateTopics(metadataVersion: MetadataVersion,
+                    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+                    brokerIdConsumer: Consumer[Integer]): Unit = {
+    val topics = zkClient.getAllTopicsInCluster()
+    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+    val replicaAssignmentAndTopicIds = 
zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    replicaAssignmentAndTopicIds.foreach { case 
TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
+      val partitions = assignments.keys.toSeq
+      val leaderIsrAndControllerEpochs = 
zkClient.getTopicPartitionStates(partitions)
+      val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+      topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+        .setName(topic)
+        .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+      assignments.foreach { case (topicPartition, replicaAssignment) =>
+        replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+        replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+
+        val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition)
+        topicBatch.add(new ApiMessageAndVersion(new PartitionRecord()
+          .setTopicId(topicIdOpt.get)
+          .setPartitionId(topicPartition.partition)
+          .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
+          
.setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+          
.setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+          
.setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+          .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+          
.setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()),
 PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      val props = topicConfigs(topic)
+      props.forEach { case (key: Object, value: Object) =>
+        topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.TOPIC.id)
+          .setResourceName(topic)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      recordConsumer.accept(topicBatch)
+    }
+  }
+
+  def migrateBrokerConfigs(metadataVersion: MetadataVersion,
+                           recordConsumer: 
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+    val batch = new util.ArrayList[ApiMessageAndVersion]()
+    zkClient.getEntitiesConfigs(ConfigType.Broker, 
brokerEntities.toSet).foreach { case (broker, props) =>
+      val brokerResource = if (broker == ConfigEntityName.Default) {
+        ""
+      } else {
+        broker
+      }
+      props.forEach { case (key: Object, value: Object) =>
+        batch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.BROKER.id)
+          .setResourceName(brokerResource)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+    }
+    recordConsumer.accept(batch)
+  }
+
+  def migrateClientQuotas(metadataVersion: MetadataVersion,
+                          recordConsumer: 
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    def migrateEntityType(entityType: String): Unit = {
+      adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, 
props) =>
+        val entity = new 
EntityData().setEntityType(entityType).setEntityName(name)
+        val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { 
case (key: String, value: Double) =>
+          batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+            .setEntity(List(entity).asJava)
+            .setKey(key)
+            .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+        }
+        recordConsumer.accept(batch)
+      }
+    }
+
+    migrateEntityType(ConfigType.User)
+    migrateEntityType(ConfigType.Client)
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, 
ConfigType.Client).foreach { case (name, props) =>
+      // Lifted from ZkAdminManager
+      val components = name.split("/")
+      if (components.size != 3 || components(1) != "clients")
+        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+      val entity = List(
+        new 
EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+        new 
EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+      )
+
+      val batch = new util.ArrayList[ApiMessageAndVersion]()
+      ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case 
(key: String, value: Double) =>
+        batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+          .setEntity(entity.asJava)
+          .setKey(key)
+          .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+      recordConsumer.accept(batch)
+    }
+
+    migrateEntityType(ConfigType.Ip)
+  }
+
+  def migrateProducerId(metadataVersion: MetadataVersion,
+                        recordConsumer: 
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt match {
+      case Some(data) =>
+        val producerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+        recordConsumer.accept(List(new ApiMessageAndVersion(new 
ProducerIdsRecord()
+          .setBrokerEpoch(-1)
+          .setBrokerId(producerIdBlock.assignedBrokerId)
+          .setNextProducerId(producerIdBlock.firstProducerId), 
ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+      case None => // Nothing to migrate
+    }
+  }
+
+  override def readAllMetadata(batchConsumer: 
Consumer[util.List[ApiMessageAndVersion]], brokerIdConsumer: 
Consumer[Integer]): Unit = {
+    migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
+    migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
+    migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
+    migrateProducerId(MetadataVersion.latest(), batchConsumer)
+  }
+
+  override def watchZkBrokerRegistrations(listener: 
MigrationClient.BrokerRegistrationListener): Unit = {
+    val brokersHandler = new ZNodeChildChangeHandler() {
+      override val path: String = BrokerIdsZNode.path
+
+      override def handleChildChange(): Unit = listener.onBrokersChange()
+    }
+    System.err.println("Adding /brokers watch")
+    zkClient.registerZNodeChildChangeHandler(brokersHandler)
+
+    def brokerHandler(brokerId: Int): ZNodeChangeHandler = {
+      new ZNodeChangeHandler() {
+        override val path: String = BrokerIdZNode.path(brokerId)
+
+        override def handleDataChange(): Unit = 
listener.onBrokerChange(brokerId)
+      }
+    }
+
+    val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
+    curBrokerAndEpochs.foreach { case (broker, _) =>
+      System.err.println(s"Adding /brokers/${broker.id} watch")
+      
zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerHandler(broker.id))
+    }
+
+    listener.onBrokersChange()
+  }
+
+  override def readBrokerRegistration(brokerId: Int): 
Optional[ZkBrokerRegistration] = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    if (brokerAndEpoch.isEmpty) {
+      Optional.empty()
+    } else {
+      Optional.of(brokerToBrokerRegistration(brokerAndEpoch.head._1, 
brokerAndEpoch.head._2))
+    }
+  }
+
+  override def readBrokerIds(): util.Set[Integer] = {
+    zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+  }
+
+  override def addZkBroker(brokerId: Int): Unit = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    controllerChannelManager.addBroker(brokerAndEpoch.head._1)
+  }
+
+  override def removeZkBroker(brokerId: Int): Unit = {
+    controllerChannelManager.removeBroker(brokerId)
+  }
+
+  override def getOrCreateMigrationRecoveryState(initialState: 
MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.getOrCreateMigrationState(initialState)
+  }
+
+  override def setMigrationRecoveryState(state: MigrationRecoveryState): 
MigrationRecoveryState = {
+    zkClient.updateMigrationState(state)
+  }
+
+  override def sendRequestToBroker(brokerId: Int,
+                                   request: AbstractControlRequest.Builder[_ 
<: AbstractControlRequest],
+                                   callback: Consumer[AbstractResponse]): Unit 
= {
+    controllerChannelManager.sendRequest(brokerId, request, callback.accept)
+  }
+
+  override def createTopic(topicName: String, topicId: Uuid, partitions: 
util.Map[Integer, PartitionRegistration], state: MigrationRecoveryState): 
MigrationRecoveryState = {
+    val assignments = partitions.asScala.map { case (partitionId, partition) =>
+      new TopicPartition(topicName, partitionId) -> 
ReplicaAssignment(partition.replicas, partition.addingReplicas, 
partition.removingReplicas)
+    }
+
+    val createTopicZNode = {
+      val path = TopicZNode.path(topicName)
+      CreateRequest(
+        path,
+        TopicZNode.encode(Some(topicId), assignments),
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+    val createPartitionsZNode = {
+      val path = TopicPartitionsZNode.path(topicName)
+      CreateRequest(
+        path,
+        null,
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+
+    val createPartitionZNodeReqs = partitions.asScala.flatMap { case 
(partitionId, partition) =>
+      val topicPartition = new TopicPartition(topicName, partitionId)
+      Seq(
+        createTopicPartition(topicPartition),
+        createTopicPartitionState(topicPartition, partition, 
state.kraftControllerEpoch())
+      )
+    }
+
+    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ 
createPartitionZNodeReqs
+    val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(requests, 
state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  private def createTopicPartition(topicPartition: TopicPartition): 
CreateRequest = {
+    val path = TopicPartitionZNode.path(topicPartition)
+    CreateRequest(path, null, zkClient.defaultAcls(path), 
CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def createTopicPartitionState(topicPartition: TopicPartition, 
partitionRegistration: PartitionRegistration, controllerEpoch: Int): 
CreateRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    CreateRequest(path, data, zkClient.defaultAcls(path), 
CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def updateTopicPartitionState(topicPartition: TopicPartition, 
partitionRegistration: PartitionRegistration, controllerEpoch: Int): 
SetDataRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+  }
+
+  override def updateTopicPartitions(topicPartitions: util.Map[String, 
util.Map[Integer, PartitionRegistration]],
+                                     state: MigrationRecoveryState): 
MigrationRecoveryState = {
+    val requests = topicPartitions.asScala.flatMap { case (topicName, 
partitionRegistrations) =>
+      partitionRegistrations.asScala.flatMap { case (partitionId, 
partitionRegistration) =>
+        val topicPartition = new TopicPartition(topicName, partitionId)
+        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, 
state.kraftControllerEpoch()))
+      }
+    }
+    if (requests.isEmpty) {
+      state
+    } else {
+      val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, 
state.controllerZkVersion(), state)
+      responses.foreach(System.err.println)
+      state.withZkVersion(migrationZkVersion)
+    }
+  }
+
+  override def createKRaftBroker(brokerId: Int, brokerRegistration: 
BrokerRegistration, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val brokerInfo = BrokerInfo(
+      Broker(
+        id = brokerId,
+        endPoints = 
brokerRegistration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
+        rack = brokerRegistration.rack().toScala),
+      MetadataVersion.latest(), // TODO ???
+      -1
+    )
+    val req = CreateRequest(brokerInfo.path, brokerInfo.toJsonBytes, 
zkClient.defaultAcls(brokerInfo.path), CreateMode.PERSISTENT)
+    val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(req), 
state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  override def updateKRaftBroker(brokerId: Int, brokerRegistration: 
BrokerRegistration, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val brokerInfo = BrokerInfo(
+      Broker(
+        id = brokerId,
+        endPoints = 
brokerRegistration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
+        rack = brokerRegistration.rack().toScala),
+      MetadataVersion.latest(), // TODO ???
+      -1
+    )
+    val req = SetDataRequest(BrokerIdZNode.path(brokerId), 
brokerInfo.toJsonBytes, ZkVersion.MatchAnyVersion)
+    val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(req), 
state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  override def removeKRaftBroker(brokerId: Int, state: 
MigrationRecoveryState): MigrationRecoveryState = {
+    val req = DeleteRequest(BrokerIdZNode.path(brokerId), 
ZkVersion.MatchAnyVersion)
+    val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(req), 
state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+}

Review Comment:
   Ditto. Why do we need these operations for KRaft brokers?



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -531,8 +537,15 @@ class KafkaServer(
     )
 
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", 
"-1").toInt
+
+    val migrationInfo = if 
(config.interBrokerProtocolVersion.isAtLeast(IBP_3_4_IV0)) {
+      Some(BrokerMigration(clusterId, config.interBrokerProtocolVersion, 
enabled = true))
+    } else {
+      None
+    }

Review Comment:
   Don't we need to check for migration flag here?



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -1772,6 +1895,106 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     }
   }
 
+  // Perform a sequence of updates to ZooKeeper as part of a KRaft dual write. 
In addition to adding a CheckOp on the
+  // controller epoch ZNode, we also include CheckOp/SetDataOp on the 
migration ZNode. This ensure proper fencing
+  // from errant ZK controllers as well as fencing from new KRaft controllers.
+  def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: 
Seq[Req],
+                                                                
expectedControllerZkVersion: Int,
+                                                                
migrationState: MigrationRecoveryState): (Int, Seq[Req#Response]) = {
+
+    if (requests.isEmpty) {
+      throw new IllegalArgumentException("Must specify at least one ZK request 
for a migration operation.")
+    }
+    def wrapMigrationRequest(request: Req, updateMigrationNode: Boolean): 
MultiRequest = {
+      val checkOp = CheckOp(ControllerEpochZNode.path, 
expectedControllerZkVersion)
+      val migrationOp = if (updateMigrationNode) {
+        SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), 
migrationState.migrationZkVersion())
+      } else {
+        CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
+      }

Review Comment:
   So, we generate Zk writes for a given offset and update the offset at the 
end of all the successful writes. But I think if there's a failure in between 
the Zk writes. We have to implement some idempotency for those Zk writes just 
in case we retry.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> 
migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, 
topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), 
migrationState -> client.createTopic(topicDelta.name(), topicId, 
topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), 
migrationState -> 
client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        
delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) 
-> {
+                            if (brokerRegistrationOpt.isPresent() && 
image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, 
migrationState -> client.createKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, 
migrationState -> client.updateKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, 
migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements 
MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements 
EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends 
RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData 
data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", 
client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? 
"done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is 
{}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, 
SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here

Review Comment:
   What is the timeline for this and who's the owner?



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> 
migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, 
topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), 
migrationState -> client.createTopic(topicDelta.name(), topicId, 
topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), 
migrationState -> 
client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        
delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) 
-> {
+                            if (brokerRegistrationOpt.isPresent() && 
image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, 
migrationState -> client.createKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, 
migrationState -> client.updateKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, 
migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements 
MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements 
EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends 
RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData 
data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", 
client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? 
"done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is 
{}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, 
SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> 
futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new 
HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data 
with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", 
brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> 
migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", 
client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new 
HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = 
client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);

Review Comment:
   I think the client addition of the broker should be done as the final step 
of ZkBrokerRegistration reconciliation in KRaft controller. Once all the 
validations are done, I think we should first add 
BrokerRegistrationChangeRecord for the addition or removal of Zk broker. Also, 
validation itself is probably an async step since we decided to use hearbeats 
to see if brokers are ready.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> 
migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, 
topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), 
migrationState -> client.createTopic(topicDelta.name(), topicId, 
topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), 
migrationState -> 
client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        
delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) 
-> {
+                            if (brokerRegistrationOpt.isPresent() && 
image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, 
migrationState -> client.createKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, 
migrationState -> client.updateKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, 
migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements 
MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements 
EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends 
RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData 
data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", 
client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? 
"done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is 
{}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, 
SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> 
futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new 
HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data 
with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", 
brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> 
migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", 
client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }

Review Comment:
   What would be idle here? KILL and retry? or simply add MigrateMetadataEvent 
again technically making async infinite loop in worst case.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> 
migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, 
topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), 
migrationState -> client.createTopic(topicDelta.name(), topicId, 
topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), 
migrationState -> 
client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        
delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) 
-> {
+                            if (brokerRegistrationOpt.isPresent() && 
image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, 
migrationState -> client.createKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, 
migrationState -> client.updateKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, 
migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements 
MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements 
EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends 
RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData 
data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", 
client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? 
"done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is 
{}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, 
SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> 
futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new 
HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data 
with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", 
brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> 
migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", 
client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }

Review Comment:
   Shouldn't this finally schedule deferred event?



##########
metadata/src/main/java/org/apache/kafka/migration/MigrationState.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.kafka.migration;
+
+public enum MigrationState {

Review Comment:
   I think it is better to illustrate possible state transitions here. 



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {

Review Comment:
   We should also have a ProcessHeartBeatEvent



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> 
migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, 
topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), 
migrationState -> client.createTopic(topicDelta.name(), topicId, 
topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), 
migrationState -> 
client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        
delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) 
-> {
+                            if (brokerRegistrationOpt.isPresent() && 
image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, 
migrationState -> client.createKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, 
migrationState -> client.updateKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, 
migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements 
MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements 
EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends 
RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData 
data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", 
client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? 
"done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is 
{}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, 
SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> 
futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new 
HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data 
with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", 
brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> 
migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", 
client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new 
HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = 
client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);
+                    zkBrokerRegistrations.put(brokerId, broker.get());
+                } else {
+                    throw new IllegalStateException("Saw broker " + brokerId + 
" added, but registration data is missing");
+                }
+            });
+            removed.forEach(brokerId -> {
+                client.removeZkBroker(brokerId);
+                zkBrokerRegistrations.remove(brokerId);
+            });
+
+            // TODO actually verify the IBP and clusterID
+            boolean brokersReady = 
zkBrokerRegistrations.values().stream().allMatch(broker ->
+                broker.isMigrationReady() && broker.ibp().isPresent() && 
broker.clusterId().isPresent());
+            // TODO add some state to track if brokers are ready
+            if (brokersReady) {
+                log.debug("All ZK Brokers are ready for migration.");
+                //transitionTo(MigrationState.READY);
+            } else {
+                log.debug("Some ZK Brokers still not ready for migration.");
+                //transitionTo(MigrationState.INELIGIBLE);
+            }
+            // TODO integrate with ClusterControlManager
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokerIdChangeEvent implements EventQueue.Event {
+        private final int brokerId;
+
+        BrokerIdChangeEvent(int brokerId) {
+            this.brokerId = brokerId;
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO not sure this is expected. Can registration data change at 
runtime?
+            log.debug("Broker {} changed. New registration: {}", brokerId, 
client.readBrokerRegistration(brokerId));
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class KRaftLeaderEvent implements EventQueue.Event {
+        private final boolean isActive;
+        private final int kraftControllerId;
+        private final int kraftControllerEpoch;
+
+        KRaftLeaderEvent(boolean isActive, int kraftControllerId, int 
kraftControllerEpoch) {
+            this.isActive = isActive;
+            this.kraftControllerId = kraftControllerId;
+            this.kraftControllerEpoch = kraftControllerEpoch;
+        }
+
+        @Override
+        public void run() throws Exception {
+            if (migrationState == MigrationState.UNINITIALIZED) {
+                // If we get notified about being the active controller before 
we have initialized, we need
+                // to reschedule this event.
+                eventQueue.append(new PollEvent());
+                eventQueue.append(this);
+                return;
+            }
+
+            if (!isActive) {
+                apply("KRaftLeaderEvent is active", state -> 
state.mergeWithControllerState(ZkControllerState.EMPTY));
+                transitionTo(MigrationState.INACTIVE);
+            } else {
+                // Apply the new KRaft state
+                apply("KRaftLeaderEvent not active", state -> 
state.withNewKRaftController(kraftControllerId, kraftControllerEpoch));
+                // Instead of doing the ZK write directly, schedule as an 
event so that we can easily retry ZK failures
+                transitionTo(MigrationState.NEW_LEADER);
+                eventQueue.append(new BecomeZkLeaderEvent());
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BecomeZkLeaderEvent extends ZkWriteEvent {
+        @Override
+        public void run() throws Exception {
+            ZkControllerState zkControllerState = 
client.claimControllerLeadership(
+                recoveryState.kraftControllerId(), 
recoveryState.kraftControllerEpoch());
+            apply("BecomeZkLeaderEvent", state -> 
state.mergeWithControllerState(zkControllerState));
+
+            if (!recoveryState.zkMigrationComplete()) {
+                transitionTo(MigrationState.ZK_MIGRATION);

Review Comment:
   I think BECOME_LEADER should move to NOT_READY. Based on the heartbeats, the 
Hearbeats event should transition the state to either NOT_READY or 
ZK_MIGRATION. 



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> 
migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, 
topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), 
migrationState -> client.createTopic(topicDelta.name(), topicId, 
topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), 
migrationState -> 
client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        
delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) 
-> {
+                            if (brokerRegistrationOpt.isPresent() && 
image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, 
migrationState -> client.createKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, 
migrationState -> client.updateKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, 
migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements 
MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements 
EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends 
RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData 
data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", 
client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? 
"done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is 
{}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, 
SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> 
futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new 
HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data 
with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", 
brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> 
migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", 
client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new 
HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = 
client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);

Review Comment:
   Technically I think this method doesn't need to directly talk to 
ClusterLinkControlManager if we do the above IMO.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft 
migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> 
migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, 
topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), 
migrationState -> client.createTopic(topicDelta.name(), topicId, 
topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), 
migrationState -> 
client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        
delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) 
-> {
+                            if (brokerRegistrationOpt.isPresent() && 
image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, 
migrationState -> client.createKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, 
migrationState -> client.updateKRaftBroker(brokerId, 
brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, 
migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements 
MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements 
EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends 
RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData 
data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", 
client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? 
"done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is 
{}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, 
SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> 
futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new 
HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data 
with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", 
brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> 
migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", 
client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new 
HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = 
client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);
+                    zkBrokerRegistrations.put(brokerId, broker.get());
+                } else {
+                    throw new IllegalStateException("Saw broker " + brokerId + 
" added, but registration data is missing");
+                }
+            });
+            removed.forEach(brokerId -> {
+                client.removeZkBroker(brokerId);
+                zkBrokerRegistrations.remove(brokerId);
+            });
+
+            // TODO actually verify the IBP and clusterID
+            boolean brokersReady = 
zkBrokerRegistrations.values().stream().allMatch(broker ->
+                broker.isMigrationReady() && broker.ibp().isPresent() && 
broker.clusterId().isPresent());
+            // TODO add some state to track if brokers are ready
+            if (brokersReady) {
+                log.debug("All ZK Brokers are ready for migration.");
+                //transitionTo(MigrationState.READY);
+            } else {
+                log.debug("Some ZK Brokers still not ready for migration.");
+                //transitionTo(MigrationState.INELIGIBLE);
+            }
+            // TODO integrate with ClusterControlManager
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + 
this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokerIdChangeEvent implements EventQueue.Event {
+        private final int brokerId;
+
+        BrokerIdChangeEvent(int brokerId) {
+            this.brokerId = brokerId;
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO not sure this is expected. Can registration data change at 
runtime?
+            log.debug("Broker {} changed. New registration: {}", brokerId, 
client.readBrokerRegistration(brokerId));
+        }

Review Comment:
   How are deregistration of brokers handled in Zk then?



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -1772,6 +1895,106 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     }
   }
 
+  // Perform a sequence of updates to ZooKeeper as part of a KRaft dual write. 
In addition to adding a CheckOp on the
+  // controller epoch ZNode, we also include CheckOp/SetDataOp on the 
migration ZNode. This ensure proper fencing
+  // from errant ZK controllers as well as fencing from new KRaft controllers.
+  def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: 
Seq[Req],
+                                                                
expectedControllerZkVersion: Int,
+                                                                
migrationState: MigrationRecoveryState): (Int, Seq[Req#Response]) = {
+
+    if (requests.isEmpty) {
+      throw new IllegalArgumentException("Must specify at least one ZK request 
for a migration operation.")
+    }
+    def wrapMigrationRequest(request: Req, updateMigrationNode: Boolean): 
MultiRequest = {
+      val checkOp = CheckOp(ControllerEpochZNode.path, 
expectedControllerZkVersion)
+      val migrationOp = if (updateMigrationNode) {
+        SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), 
migrationState.migrationZkVersion())
+      } else {
+        CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
+      }
+
+      request match {
+        case CreateRequest(path, data, acl, createMode, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, 
createMode)), ctx)
+        case DeleteRequest(path, version, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx)
+        case SetDataRequest(path, data, version, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, 
version)), ctx)
+        case _ => throw new IllegalStateException(s"$request does not need 
controller epoch check")
+      }
+    }
+
+    def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: 
OpResult): Int = {
+      val (path: String, data: Option[Array[Byte]], version: Int) = 
migrationOp match {
+        case CheckOp(path, version) => (path, None, version)
+        case SetDataOp(path, data, version) => (path, Some(data), version)
+        case _ => throw new IllegalStateException("Unexpected result on 
/migration znode")
+      }
+
+      migrationResult match {
+        case _: CheckResult => version
+        case setDataResult: SetDataResult => setDataResult.getStat.getVersion
+        case errorResult: ErrorResult =>
+          if (path.equals(MigrationZNode.path)) {
+            val errorCode = Code.get(errorResult.getErr)
+            if (errorCode == Code.BADVERSION) {
+              data match {
+                case Some(value) =>
+                  val failedPayload = MigrationZNode.decode(value, version)
+                  throw new RuntimeException(s"Conditional update on KRaft 
Migration znode failed. Expected zkVersion = ${version}. " +
+                    s"The failed write was: ${failedPayload}. This indicates 
that another KRaft controller is making writes to ZooKeeper.")
+                case None =>
+                  throw new RuntimeException(s"Check op on KRaft Migration 
znode failed. Expected zkVersion = ${version}. " +
+                    s"This indicates that another KRaft controller is making 
writes to ZooKeeper.")
+              }
+            } else if (errorCode == Code.OK) {
+              // what?
+              version
+            } else {
+              throw KeeperException.create(errorCode, path)
+            }
+          } else {
+            throw new RuntimeException(s"Got migration result for incorrect 
path $path")
+          }
+        case _ => throw new RuntimeException(s"Expected either CheckResult, 
SetDataResult, or ErrorResult for migration op, but saw ${migrationResult}")
+      }
+    }
+
+    def unwrapMigrationRequest(response: AsyncResponse): (AsyncResponse, Int) 
= {

Review Comment:
   unwrapMigrationResponse?



##########
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##########
@@ -109,6 +113,13 @@ class KafkaRaftServer(
   }
 
   private val controller: Option[ControllerServer] = if 
(config.processRoles.contains(ControllerRole)) {
+    // TODO clean these up
+    val zkClient = KafkaServer.zkClient("KRaft migration", time, config, 
KafkaServer.zkClientConfigFromKafkaConfig(config))
+    val stateChangeLogger = new StateChangeLogger(-1, inControllerContext = 
false, None)
+    val channelManager = new ControllerChannelManager(() => -1, config, 
Time.SYSTEM, new Metrics(), stateChangeLogger)
+    val migrationClient = new ZkMigrationClient(zkClient, channelManager)

Review Comment:
   Just to note, even though we named the class `ZkMigrationClient,` we seem to 
be using it not only to communicate with Zookeeper but also with Zookeeper 
based brokers. 



##########
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##########
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, 
LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, 
VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): 
ZkBrokerRegistration = {
+      val registration = new BrokerRegistration(broker.id, epoch, 
Uuid.ZERO_UUID,
+        Collections.emptyList[Endpoint], Collections.emptyMap[String, 
VersionRange],
+        Optional.empty(), false, false)
+      new ZkBrokerRegistration(registration, null, null, false)
+  }
+}
+
+class ZkMigrationClient(zkClient: KafkaZkClient,
+                        controllerChannelManager: ControllerChannelManager) 
extends MigrationClient with Logging {
+
+  def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: 
Int): ZkControllerState = {
+    val epochZkVersionOpt = 
zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, 
kraftControllerEpoch)
+    if (epochZkVersionOpt.isDefined) {
+      new ZkControllerState(kraftControllerId, kraftControllerEpoch, 
epochZkVersionOpt.get)
+    } else {
+      throw new ControllerMovedException("Cannot claim controller leadership, 
the controller has moved.")
+    }
+  }
+
+  def migrateTopics(metadataVersion: MetadataVersion,
+                    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+                    brokerIdConsumer: Consumer[Integer]): Unit = {
+    val topics = zkClient.getAllTopicsInCluster()
+    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+    val replicaAssignmentAndTopicIds = 
zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    replicaAssignmentAndTopicIds.foreach { case 
TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
+      val partitions = assignments.keys.toSeq
+      val leaderIsrAndControllerEpochs = 
zkClient.getTopicPartitionStates(partitions)
+      val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+      topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+        .setName(topic)
+        .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+      assignments.foreach { case (topicPartition, replicaAssignment) =>
+        replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+        replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+
+        val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition)
+        topicBatch.add(new ApiMessageAndVersion(new PartitionRecord()
+          .setTopicId(topicIdOpt.get)
+          .setPartitionId(topicPartition.partition)
+          .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
+          
.setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+          
.setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+          
.setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+          .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+          
.setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()),
 PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      val props = topicConfigs(topic)
+      props.forEach { case (key: Object, value: Object) =>
+        topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.TOPIC.id)
+          .setResourceName(topic)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      recordConsumer.accept(topicBatch)
+    }
+  }
+
+  def migrateBrokerConfigs(metadataVersion: MetadataVersion,
+                           recordConsumer: 
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+    val batch = new util.ArrayList[ApiMessageAndVersion]()
+    zkClient.getEntitiesConfigs(ConfigType.Broker, 
brokerEntities.toSet).foreach { case (broker, props) =>
+      val brokerResource = if (broker == ConfigEntityName.Default) {
+        ""
+      } else {
+        broker
+      }
+      props.forEach { case (key: Object, value: Object) =>
+        batch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.BROKER.id)
+          .setResourceName(brokerResource)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+    }
+    recordConsumer.accept(batch)
+  }
+
+  def migrateClientQuotas(metadataVersion: MetadataVersion,
+                          recordConsumer: 
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    def migrateEntityType(entityType: String): Unit = {
+      adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, 
props) =>
+        val entity = new 
EntityData().setEntityType(entityType).setEntityName(name)
+        val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { 
case (key: String, value: Double) =>
+          batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+            .setEntity(List(entity).asJava)
+            .setKey(key)
+            .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+        }
+        recordConsumer.accept(batch)
+      }
+    }
+
+    migrateEntityType(ConfigType.User)
+    migrateEntityType(ConfigType.Client)
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, 
ConfigType.Client).foreach { case (name, props) =>
+      // Lifted from ZkAdminManager
+      val components = name.split("/")
+      if (components.size != 3 || components(1) != "clients")
+        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+      val entity = List(
+        new 
EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+        new 
EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+      )
+
+      val batch = new util.ArrayList[ApiMessageAndVersion]()
+      ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case 
(key: String, value: Double) =>
+        batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+          .setEntity(entity.asJava)
+          .setKey(key)
+          .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+      recordConsumer.accept(batch)
+    }
+
+    migrateEntityType(ConfigType.Ip)
+  }
+
+  def migrateProducerId(metadataVersion: MetadataVersion,
+                        recordConsumer: 
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt match {
+      case Some(data) =>
+        val producerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+        recordConsumer.accept(List(new ApiMessageAndVersion(new 
ProducerIdsRecord()
+          .setBrokerEpoch(-1)
+          .setBrokerId(producerIdBlock.assignedBrokerId)
+          .setNextProducerId(producerIdBlock.firstProducerId), 
ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+      case None => // Nothing to migrate
+    }
+  }
+
+  override def readAllMetadata(batchConsumer: 
Consumer[util.List[ApiMessageAndVersion]], brokerIdConsumer: 
Consumer[Integer]): Unit = {
+    migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
+    migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
+    migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
+    migrateProducerId(MetadataVersion.latest(), batchConsumer)
+  }
+
+  override def watchZkBrokerRegistrations(listener: 
MigrationClient.BrokerRegistrationListener): Unit = {
+    val brokersHandler = new ZNodeChildChangeHandler() {
+      override val path: String = BrokerIdsZNode.path
+
+      override def handleChildChange(): Unit = listener.onBrokersChange()
+    }
+    System.err.println("Adding /brokers watch")
+    zkClient.registerZNodeChildChangeHandler(brokersHandler)
+
+    def brokerHandler(brokerId: Int): ZNodeChangeHandler = {
+      new ZNodeChangeHandler() {
+        override val path: String = BrokerIdZNode.path(brokerId)
+
+        override def handleDataChange(): Unit = 
listener.onBrokerChange(brokerId)
+      }
+    }
+
+    val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
+    curBrokerAndEpochs.foreach { case (broker, _) =>
+      System.err.println(s"Adding /brokers/${broker.id} watch")
+      
zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerHandler(broker.id))
+    }
+
+    listener.onBrokersChange()
+  }
+
+  override def readBrokerRegistration(brokerId: Int): 
Optional[ZkBrokerRegistration] = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    if (brokerAndEpoch.isEmpty) {
+      Optional.empty()
+    } else {
+      Optional.of(brokerToBrokerRegistration(brokerAndEpoch.head._1, 
brokerAndEpoch.head._2))
+    }
+  }
+
+  override def readBrokerIds(): util.Set[Integer] = {
+    zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+  }
+
+  override def addZkBroker(brokerId: Int): Unit = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    controllerChannelManager.addBroker(brokerAndEpoch.head._1)
+  }
+
+  override def removeZkBroker(brokerId: Int): Unit = {
+    controllerChannelManager.removeBroker(brokerId)
+  }
+
+  override def getOrCreateMigrationRecoveryState(initialState: 
MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.getOrCreateMigrationState(initialState)
+  }
+
+  override def setMigrationRecoveryState(state: MigrationRecoveryState): 
MigrationRecoveryState = {
+    zkClient.updateMigrationState(state)
+  }
+
+  override def sendRequestToBroker(brokerId: Int,
+                                   request: AbstractControlRequest.Builder[_ 
<: AbstractControlRequest],
+                                   callback: Consumer[AbstractResponse]): Unit 
= {
+    controllerChannelManager.sendRequest(brokerId, request, callback.accept)
+  }
+
+  override def createTopic(topicName: String, topicId: Uuid, partitions: 
util.Map[Integer, PartitionRegistration], state: MigrationRecoveryState): 
MigrationRecoveryState = {
+    val assignments = partitions.asScala.map { case (partitionId, partition) =>
+      new TopicPartition(topicName, partitionId) -> 
ReplicaAssignment(partition.replicas, partition.addingReplicas, 
partition.removingReplicas)
+    }
+
+    val createTopicZNode = {
+      val path = TopicZNode.path(topicName)
+      CreateRequest(
+        path,
+        TopicZNode.encode(Some(topicId), assignments),
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+    val createPartitionsZNode = {
+      val path = TopicPartitionsZNode.path(topicName)
+      CreateRequest(
+        path,
+        null,
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+
+    val createPartitionZNodeReqs = partitions.asScala.flatMap { case 
(partitionId, partition) =>
+      val topicPartition = new TopicPartition(topicName, partitionId)
+      Seq(
+        createTopicPartition(topicPartition),
+        createTopicPartitionState(topicPartition, partition, 
state.kraftControllerEpoch())
+      )
+    }
+
+    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ 
createPartitionZNodeReqs
+    val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(requests, 
state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  private def createTopicPartition(topicPartition: TopicPartition): 
CreateRequest = {
+    val path = TopicPartitionZNode.path(topicPartition)
+    CreateRequest(path, null, zkClient.defaultAcls(path), 
CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def createTopicPartitionState(topicPartition: TopicPartition, 
partitionRegistration: PartitionRegistration, controllerEpoch: Int): 
CreateRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    CreateRequest(path, data, zkClient.defaultAcls(path), 
CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def updateTopicPartitionState(topicPartition: TopicPartition, 
partitionRegistration: PartitionRegistration, controllerEpoch: Int): 
SetDataRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+  }
+
+  override def updateTopicPartitions(topicPartitions: util.Map[String, 
util.Map[Integer, PartitionRegistration]],
+                                     state: MigrationRecoveryState): 
MigrationRecoveryState = {
+    val requests = topicPartitions.asScala.flatMap { case (topicName, 
partitionRegistrations) =>
+      partitionRegistrations.asScala.flatMap { case (partitionId, 
partitionRegistration) =>
+        val topicPartition = new TopicPartition(topicName, partitionId)
+        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, 
state.kraftControllerEpoch()))
+      }
+    }
+    if (requests.isEmpty) {
+      state
+    } else {
+      val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, 
state.controllerZkVersion(), state)
+      responses.foreach(System.err.println)
+      state.withZkVersion(migrationZkVersion)
+    }
+  }
+
+  override def createKRaftBroker(brokerId: Int, brokerRegistration: 
BrokerRegistration, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val brokerInfo = BrokerInfo(
+      Broker(
+        id = brokerId,
+        endPoints = 
brokerRegistration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
+        rack = brokerRegistration.rack().toScala),
+      MetadataVersion.latest(), // TODO ???
+      -1
+    )
+    val req = CreateRequest(brokerInfo.path, brokerInfo.toJsonBytes, 
zkClient.defaultAcls(brokerInfo.path), CreateMode.PERSISTENT)
+    val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(req), 
state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }

Review Comment:
   Why are we registering KRaft broker in Zookeeper?



##########
metadata/src/main/java/org/apache/kafka/migration/MigrationState.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.kafka.migration;
+
+public enum MigrationState {
+    UNINITIALIZED(false),   // Initial state
+    INACTIVE(false),        // State when not the active controller
+    NEW_LEADER(false),      // State after KRaft leader election and before ZK 
leadership claim

Review Comment:
   NEW_LEADER --> BECOME_LEADER?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to