1996fanrui commented on code in PR #28554:
URL: https://github.com/apache/flink/pull/28554#discussion_r3498616956


##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -881,58 +923,177 @@ private CompletableFuture<Void> restoreStateAndGates(
                 INITIALIZE_STATE_DURATION, initializeStateEndTs - 
readOutputDataTs);
         IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
 
-        boolean checkpointingDuringRecoveryEnabled =
-                
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
+        recoveryCompletionFuture =
+                
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration())
+                        ? recoverChannelsWithCheckpointing(reader, inputGates)
+                        : recoverChannelsWithoutCheckpointing(reader, 
inputGates);
+
+        recoveryCompletionFuture.whenComplete((ign, throwable) -> 
mailboxProcessor.suspend());
+        return recoveryCompletionFuture;
+    }
+
+    private CompletableFuture<Void> recoverChannelsWithCheckpointing(
+            SequentialChannelStateReader reader, IndexedInputGate[] 
inputGates) {
+        if (inputGates.length == 0) {
+            // No input channels to recover (e.g. a source task). Complete 
synchronously, exactly
+            // like recoverChannelsWithoutCheckpointing, so that the 
recovery-completion suspend()
+            // in restoreStateAndGates is enqueued before the restore mailbox 
loop runs the default
+            // action. The asynchronous chain below relies on mailbox mails 
that only run while the
+            // restore loop is pumping; with no channel work to do it would 
merely defer completion,
+            // letting a fast source finish and suspend the restore loop 
before recovery completes
+            // -- tripping "Mailbox loop interrupted before recovery was 
finished" in
+            // restoreInternal. There is nothing to checkpoint during recovery 
here either, so the
+            // trigger goes straight to NO_OP.
+            recoveryCheckpointTrigger = RecoveryCheckpointTrigger.NO_OP;
+            return FutureUtils.completedVoidFuture();
+        }
+        return 
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NOT_READY)
+                .thenApplyAsync(ign -> fetchChannelState(reader, inputGates), 
channelIOExecutor)
+                .thenCompose(
+                        state ->
+                                requestPartitions(inputGates, 
state.isPresent())
+                                        .thenApply(channels -> 
buildDrainer(state, channels)))
+                .thenCompose(this::drainThroughCheckpointTrigger)
+                .thenRun(() -> 
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NO_OP));

Review Comment:
   This `NO_OP` swap fires when `drain()` completes, but drain() only appends 
the `EndOfFetchedChannelStateEvent` sentinel — a channel doesn't actually leave 
recovery (`inRecovery = false`) until the mailbox polls that sentinel in 
onRecoveredStateConsumed(). So there's a window where `trigger == NO_OP` while 
a channel is still inRecovery: a barrier then gets no 
`RecoveryCheckpointBarrier` inserted, and collectPreRecoveryBarrier spuriously 
declines with `TASK_NOT_READY`. That's the bug behind the `TASK_NOT_READY` I 
[flagged 
earlier](https://github.com/apache/flink/pull/28554#discussion_r3486146963) — 
it's masking this race. We should keep the trigger live until every channel has 
consumed its end-of-recovery sentinel: either gate this swap on all channels 
reaching inRecovery == false, or move it next to the existing "channel state 
fully consumed → unblock upstream" point (the two events are adjacent), which 
is simpler.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -881,58 +923,177 @@ private CompletableFuture<Void> restoreStateAndGates(
                 INITIALIZE_STATE_DURATION, initializeStateEndTs - 
readOutputDataTs);
         IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
 
-        boolean checkpointingDuringRecoveryEnabled =
-                
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
+        recoveryCompletionFuture =
+                
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration())
+                        ? recoverChannelsWithCheckpointing(reader, inputGates)
+                        : recoverChannelsWithoutCheckpointing(reader, 
inputGates);
+
+        recoveryCompletionFuture.whenComplete((ign, throwable) -> 
mailboxProcessor.suspend());
+        return recoveryCompletionFuture;
+    }
+
+    private CompletableFuture<Void> recoverChannelsWithCheckpointing(
+            SequentialChannelStateReader reader, IndexedInputGate[] 
inputGates) {
+        if (inputGates.length == 0) {
+            // No input channels to recover (e.g. a source task). Complete 
synchronously, exactly
+            // like recoverChannelsWithoutCheckpointing, so that the 
recovery-completion suspend()
+            // in restoreStateAndGates is enqueued before the restore mailbox 
loop runs the default
+            // action. The asynchronous chain below relies on mailbox mails 
that only run while the
+            // restore loop is pumping; with no channel work to do it would 
merely defer completion,
+            // letting a fast source finish and suspend the restore loop 
before recovery completes
+            // -- tripping "Mailbox loop interrupted before recovery was 
finished" in
+            // restoreInternal. There is nothing to checkpoint during recovery 
here either, so the
+            // trigger goes straight to NO_OP.
+            recoveryCheckpointTrigger = RecoveryCheckpointTrigger.NO_OP;
+            return FutureUtils.completedVoidFuture();
+        }
+        return 
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NOT_READY)
+                .thenApplyAsync(ign -> fetchChannelState(reader, inputGates), 
channelIOExecutor)
+                .thenCompose(
+                        state ->
+                                requestPartitions(inputGates, 
state.isPresent())
+                                        .thenApply(channels -> 
buildDrainer(state, channels)))
+                .thenCompose(this::drainThroughCheckpointTrigger)
+                .thenRun(() -> 
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NO_OP));

Review Comment:
   Following is the unexpected exception from log:
   ```
   Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: 
org.apache.flink.runtime.checkpoint.CheckpointException: 
RecoveryCheckpointBarrier for checkpoint 7 not yet present in channel 
InputChannelInfo{gateIdx=0, inputChannelIdx=0} Failure reason: Checkpoint was 
declined (tasks not ready)
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.collectPreRecoveryBarrier(RemoteInputChannel.java:969)
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkpointStarted(RemoteInputChannel.java:886)
        at 
org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:41)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.onCheckpointStartedForAllInputs(ChannelState.java:133)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:75)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:56)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:245)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:273)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:242)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:184)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:162)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.processPriorityEvents(CheckpointedInputGate.java:114)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:118)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:415)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:384)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:369)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1183)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1120)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:987)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:969)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:774)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to