junrao commented on code in PR #12271:
URL: https://github.com/apache/kafka/pull/12271#discussion_r893805506


##########
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala:
##########
@@ -315,35 +329,43 @@ class ZkReplicaStateMachine(config: KafkaConfig,
   }
 
   /**
-   * Try to remove a replica from the isr of multiple partitions.
-   * Removing a replica from isr updates partition state in zookeeper.
+   * Fence an existing replica by bumping the leader epoch. The bumped leader 
epoch
+   * ensures 1) that the follower can no longer fetch with the old epoch, and 
2)
+   * that it will accept a `StopReplica` request with the bumped epoch.
    *
-   * @param replicaId The replica being removed from isr of multiple partitions
-   * @param partitions The partitions from which we're trying to remove the 
replica from isr
+   * - If the replica is the current leader, then the leader will be changed to
+   *   [[LeaderAndIsr.NoLeader]], and it will remain in the ISR.
+   * - If the replica is not the current leader and it is in the ISR, then it
+   *   will be removed from the ISR.
+   * - Otherwise, the epoch will be bumped and the leader and ISR will be 
unchanged.
+   *
+   * Fencing a replica updates partition state in zookeeper.
+   *
+   * @param replicaId The replica being fenced from multiple partitions
+   * @param partitions The partitions from which we're trying to fence the 
replica from
    * @return A tuple of two elements:
    *         1. The updated Right[LeaderIsrAndControllerEpochs] of all 
partitions for which we successfully
-   *         removed the replica from isr. Or Left[Exception] corresponding to 
failed removals that should
-   *         not be retried
+   *         fenced the replica. Or Left[Exception] for failures which need to 
be retries
    *         2. The partitions that we should retry due to a zookeeper 
BADVERSION conflict. Version conflicts can occur if
    *         the partition leader updated partition state while the controller 
attempted to update partition state.
    */
-  private def doRemoveReplicasFromIsr(
+  private def tryFenceReplicas(
     replicaId: Int,
     partitions: Seq[TopicPartition]
   ): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], 
Seq[TopicPartition]) = {
     val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) = 
getTopicPartitionStatesFromZk(partitions)
-    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = 
leaderAndIsrs.partition { case (_, result) =>
-      result.map { leaderAndIsr =>
-        leaderAndIsr.isr.contains(replicaId)
-      }.getOrElse(false)
-    }
-
-    val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = 
leaderAndIsrsWithReplica.flatMap {
+    val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = 
leaderAndIsrs.flatMap {
       case (partition, result) =>
         result.toOption.map { leaderAndIsr =>
-          val newLeader = if (replicaId == leaderAndIsr.leader) 
LeaderAndIsr.NoLeader else leaderAndIsr.leader
-          val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr 
else leaderAndIsr.isr.filter(_ != replicaId)
-          partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+          if (leaderAndIsr.isr.contains(replicaId)) {
+            val newLeader = if (replicaId == leaderAndIsr.leader) 
LeaderAndIsr.NoLeader else leaderAndIsr.leader
+            val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr 
else leaderAndIsr.isr.filter(_ != replicaId)
+            partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+          } else {
+            // Even if the replica is not in the ISR. We must bump the epoch 
to ensure the replica
+            // is fenced from replication and the `StopReplica` can be sent 
with a bumped epoch.
+            partition -> leaderAndIsr.newEpoch

Review Comment:
   A few comments on this.
   (1) This seems to be a too low level to do this. When we cancel a 
reassignment, we may remove multiple replicas from a partition. Instead of 
bumping up the leader epoch for every replica, it's probably better to bump up 
the leader epoch once. Perhaps it's better to do this at the high level in 
KafkaController.updateCurrentReassignment().
   (2) Controlled shutdown also goes through Offline transition.  It's possible 
that the shutting down broker is out of isr. In that case, do we want to bump 
up the leader epoch?
   (3) The current controlled shutdown logic also seems to have the same issue 
that its StopReplicaRequest will be ignored. When doing a controlled shutdown 
for a follower, it seems that we don't bump up the leader epoch. So, the 
StopReplicaRequest will be ignored by the follower? 
   (4) Should we do the same fix in KRaft?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to