mumrah commented on code in PR #12973:
URL: https://github.com/apache/kafka/pull/12973#discussion_r1047768261


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -166,60 +166,74 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    * the migration.
    *
    * To ensure that the KRaft controller epoch exceeds the current ZK 
controller epoch, this registration algorithm
-   * uses a conditional update on the /controller_epoch znode. If a new ZK 
controller is elected during this method,
-   * the conditional update on /controller_epoch fails which causes the whole 
multi-op transaction to fail.
+   * uses a conditional update on the /controller and /controller_epoch znodes.
+   *
+   * If a new controller is registered concurrently with this registration, 
one of the two will fail the CAS
+   * operation on /controller_epoch. For KRaft, we have an extra guard against 
the registered KRaft epoch going
+   * backwards. If a KRaft controller had previously registered, an additional 
CAS operation is done on the /controller
+   * ZNode to ensure that the KRaft epoch being registered is newer.
    *
    * @param kraftControllerId ID of the KRaft controller node
    * @param kraftControllerEpoch Epoch of the KRaft controller node
-   * @return An optional of the new zkVersion of /controller_epoch. None if we 
could not register the KRaft controller.
+   * @return An optional of the written epoch and new zkVersion of 
/controller_epoch. None if we could not register the KRaft controller.
    */
-  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): Option[Int] = {
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): Option[(Int, Int)] = {
     val timestamp = time.milliseconds()
     val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, 
e._2.getVersion))
-    val controllerOpt = getControllerId
-    val controllerEpochToStore = kraftControllerEpoch + 10000000 // TODO 
Remove this after KAFKA-14436
+    val controllerOpt = getControllerRegistration
+
+    // If we have a KRaft epoch registered in /controller, and it is not 
_older_ than the requested epoch, throw an error.
+    controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk =>
+      if (kraftEpochInZk >= kraftControllerEpoch) {
+        throw new ControllerMovedException(s"Cannot register KRaft controller 
$kraftControllerId with epoch $kraftControllerEpoch " +
+          s"as the current controller register in ZK has the same or newer 
epoch $kraftEpochInZk.")
+      }
+    }
+
     curEpochOpt match {
       case None =>
         throw new IllegalStateException(s"Cannot register KRaft controller 
$kraftControllerId as the active controller " +
           s"since there is no ZK controller epoch present.")
       case Some((curEpoch: Int, curEpochZk: Int)) =>
-        if (curEpoch >= controllerEpochToStore) {
-          // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK
-          throw new IllegalStateException(s"Cannot register KRaft controller 
$kraftControllerId as the active controller " +
-            s"in ZK since its epoch ${controllerEpochToStore} is not higher 
than the current ZK epoch ${curEpoch}.")
+        // TODO KAFKA-14436 Increase the KRaft epoch to be higher than the ZK 
epoch
+        val newControllerEpoch = if (kraftControllerEpoch >= curEpoch) {
+          kraftControllerEpoch
+        } else {
+          curEpoch + 1
         }
 
-        val response = if (controllerOpt.isDefined) {
-          info(s"KRaft controller $kraftControllerId overwriting 
${ControllerZNode.path} to become the active " +
-            s"controller with epoch $controllerEpochToStore. The previous 
controller was ${controllerOpt.get}.")
-          retryRequestUntilConnected(
-            MultiRequest(Seq(
-              SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-              DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
-              CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp),
-                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-          )
-        } else {
-          info(s"KRaft controller $kraftControllerId creating 
${ControllerZNode.path} to become the active " +
-            s"controller with epoch $controllerEpochToStore. There was no 
active controller.")
-          retryRequestUntilConnected(
-            MultiRequest(Seq(
-              SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-              CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp),
-                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-          )
+        val response = controllerOpt match {
+          case Some(controller) =>
+            info(s"KRaft controller $kraftControllerId overwriting 
${ControllerZNode.path} to become the active " +
+              s"controller with ZK epoch $newControllerEpoch. The previous 
controller was ${controller.broker}.")
+            retryRequestUntilConnected(
+              MultiRequest(Seq(
+                SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
+                DeleteOp(ControllerZNode.path, controller.zkVersion),

Review Comment:
   The backoff + retry logic will live in the migration driver code which is 
still TBD. This client just provides the result (and tries not to thrown an 
error)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to