junrao commented on code in PR #12271:
URL: https://github.com/apache/kafka/pull/12271#discussion_r893805506
##########
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala:
##########
@@ -315,35 +329,43 @@ class ZkReplicaStateMachine(config: KafkaConfig,
}
/**
- * Try to remove a replica from the isr of multiple partitions.
- * Removing a replica from isr updates partition state in zookeeper.
+ * Fence an existing replica by bumping the leader epoch. The bumped leader
epoch
+ * ensures 1) that the follower can no longer fetch with the old epoch, and
2)
+ * that it will accept a `StopReplica` request with the bumped epoch.
*
- * @param replicaId The replica being removed from isr of multiple partitions
- * @param partitions The partitions from which we're trying to remove the
replica from isr
+ * - If the replica is the current leader, then the leader will be changed to
+ * [[LeaderAndIsr.NoLeader]], and it will remain in the ISR.
+ * - If the replica is not the current leader and it is in the ISR, then it
+ * will be removed from the ISR.
+ * - Otherwise, the epoch will be bumped and the leader and ISR will be
unchanged.
+ *
+ * Fencing a replica updates partition state in zookeeper.
+ *
+ * @param replicaId The replica being fenced from multiple partitions
+ * @param partitions The partitions from which we're trying to fence the
replica from
* @return A tuple of two elements:
* 1. The updated Right[LeaderIsrAndControllerEpochs] of all
partitions for which we successfully
- * removed the replica from isr. Or Left[Exception] corresponding to
failed removals that should
- * not be retried
+ * fenced the replica. Or Left[Exception] for failures which need to
be retries
* 2. The partitions that we should retry due to a zookeeper
BADVERSION conflict. Version conflicts can occur if
* the partition leader updated partition state while the controller
attempted to update partition state.
*/
- private def doRemoveReplicasFromIsr(
+ private def tryFenceReplicas(
replicaId: Int,
partitions: Seq[TopicPartition]
): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]],
Seq[TopicPartition]) = {
val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) =
getTopicPartitionStatesFromZk(partitions)
- val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) =
leaderAndIsrs.partition { case (_, result) =>
- result.map { leaderAndIsr =>
- leaderAndIsr.isr.contains(replicaId)
- }.getOrElse(false)
- }
-
- val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] =
leaderAndIsrsWithReplica.flatMap {
+ val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] =
leaderAndIsrs.flatMap {
case (partition, result) =>
result.toOption.map { leaderAndIsr =>
- val newLeader = if (replicaId == leaderAndIsr.leader)
LeaderAndIsr.NoLeader else leaderAndIsr.leader
- val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr
else leaderAndIsr.isr.filter(_ != replicaId)
- partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+ if (leaderAndIsr.isr.contains(replicaId)) {
+ val newLeader = if (replicaId == leaderAndIsr.leader)
LeaderAndIsr.NoLeader else leaderAndIsr.leader
+ val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr
else leaderAndIsr.isr.filter(_ != replicaId)
+ partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+ } else {
+ // Even if the replica is not in the ISR. We must bump the epoch
to ensure the replica
+ // is fenced from replication and the `StopReplica` can be sent
with a bumped epoch.
+ partition -> leaderAndIsr.newEpoch
Review Comment:
A few comments on this.
(1) This seems to be a too low level to do this. When we cancel a
reassignment, we may remove multiple replicas from a partition. Instead of
bumping up the leader epoch for every replica, it's probably better to bump up
the leader epoch once. Perhaps it's better to do this at the high level in
KafkaController.updateCurrentReassignment().
(2) Controlled shutdown also goes through Offline transition. It's possible
that the shutting down broker is out of isr. In that case, do we want to bump
up the leader epoch?
(3) The current controlled shutdown logic also seems to have the same issue
that its StopReplicaRequest will be ignored. When doing a controlled shutdown
for a follower, it seems that we don't bump up the leader epoch. So, the
StopReplicaRequest will be ignored by the follower?
(4) Should we do the same fix in KRaft?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]