hachikuji commented on a change in pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#discussion_r711231383



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1304,53 +1323,41 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
-    // This is called from maybeExpandIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
-      // a more constrained state for advancing the HW.
-      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
-    } else {
-      trace(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica")
-    }
+  private def prepareIsrExpand(newInSyncReplicaId: Int): PendingExpandIsr = {
+    // When expanding the ISR, we assume that the new replica will make it 
into the ISR
+    // before we receive confirmation that it has. This ensures that the HW 
will already
+    // reflect the updated ISR even if there is a delay before we receive the 
confirmation.
+    // Alternatively, if the update fails, no harm is done since the expanded 
ISR puts
+    // a stricter requirement for advancement of the HW.
+    val isrToSend = isrState.isr + newInSyncReplicaId
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+    val updatedState = PendingExpandIsr(isrState.isr, newInSyncReplicaId, 
newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
-    // This is called from maybeShrinkIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When shrinking the ISR, we cannot assume that the update will succeed 
as this could erroneously advance the HW
-      // We update pendingInSyncReplicaIds here simply to prevent any further 
ISR updates from occurring until we get
-      // the next LeaderAndIsr
-      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
-    } else {
-      trace(s"ISR update in-flight, not removing out-of-sync replicas 
$outOfSyncReplicas")
-    }
+  private[cluster] def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): 
PendingShrinkIsr = {
+    // When shrinking the ISR, we cannot assume that the update will succeed 
as this could
+    // erroneously advance the HW if the `AlterIsr` were to fail. Hence the 
"maximal ISR"
+    // for `PendingShrinkIsr` is the the current ISR.
+    val isrToSend = isrState.isr -- outOfSyncReplicaIds
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+    val updatedState = PendingShrinkIsr(isrState.isr, outOfSyncReplicaIds, 
newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
-    val isrToSend: Set[Int] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + 
newInSyncReplicaId
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- 
outOfSyncReplicaIds
-      case state =>
-        isrChangeListener.markFailed()
-        throw new IllegalStateException(s"Invalid state $state for ISR change 
for partition $topicPartition")
-    }
+  private def alterIsr(proposedIsrState: PendingIsrChange): 
CompletableFuture[LeaderAndIsr] = {
+    debug(s"Submitting ISR state change $proposedIsrState")
+    val future = alterIsrManager.submit(topicPartition, 
proposedIsrState.sentLeaderAndIsr, controllerEpoch)
 
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
-    val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState), controllerEpoch)
-
-    val oldState = isrState
-    isrState = proposedIsrState
-
-    if (!alterIsrManager.submit(alterIsrItem)) {
-      // If the ISR manager did not accept our update, we need to revert the 
proposed state.
-      // This can happen if the ISR state was updated by the controller (via 
LeaderAndIsr in ZK-mode or
-      // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request 
still in-flight.
-      isrState = oldState
-      isrChangeListener.markFailed()
-      warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition 
$topicPartition")
-    } else {
-      debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition 
to $proposedIsrState")
+    val callback = handleAlterIsrResponse(proposedIsrState) _

Review comment:
       Yeah, thanks for the nudge. I ended up refactoring the handling logic. 
It seems cleaner now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to