1996fanrui commented on code in PR #28554:
URL: https://github.com/apache/flink/pull/28554#discussion_r3498616956
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -881,58 +923,177 @@ private CompletableFuture<Void> restoreStateAndGates(
INITIALIZE_STATE_DURATION, initializeStateEndTs -
readOutputDataTs);
IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
- boolean checkpointingDuringRecoveryEnabled =
-
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
+ recoveryCompletionFuture =
+
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration())
+ ? recoverChannelsWithCheckpointing(reader, inputGates)
+ : recoverChannelsWithoutCheckpointing(reader,
inputGates);
+
+ recoveryCompletionFuture.whenComplete((ign, throwable) ->
mailboxProcessor.suspend());
+ return recoveryCompletionFuture;
+ }
+
+ private CompletableFuture<Void> recoverChannelsWithCheckpointing(
+ SequentialChannelStateReader reader, IndexedInputGate[]
inputGates) {
+ if (inputGates.length == 0) {
+ // No input channels to recover (e.g. a source task). Complete
synchronously, exactly
+ // like recoverChannelsWithoutCheckpointing, so that the
recovery-completion suspend()
+ // in restoreStateAndGates is enqueued before the restore mailbox
loop runs the default
+ // action. The asynchronous chain below relies on mailbox mails
that only run while the
+ // restore loop is pumping; with no channel work to do it would
merely defer completion,
+ // letting a fast source finish and suspend the restore loop
before recovery completes
+ // -- tripping "Mailbox loop interrupted before recovery was
finished" in
+ // restoreInternal. There is nothing to checkpoint during recovery
here either, so the
+ // trigger goes straight to NO_OP.
+ recoveryCheckpointTrigger = RecoveryCheckpointTrigger.NO_OP;
+ return FutureUtils.completedVoidFuture();
+ }
+ return
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NOT_READY)
+ .thenApplyAsync(ign -> fetchChannelState(reader, inputGates),
channelIOExecutor)
+ .thenCompose(
+ state ->
+ requestPartitions(inputGates,
state.isPresent())
+ .thenApply(channels ->
buildDrainer(state, channels)))
+ .thenCompose(this::drainThroughCheckpointTrigger)
+ .thenRun(() ->
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NO_OP));
Review Comment:
This `NO_OP` swap fires when `drain()` completes, but drain() only appends
the `EndOfFetchedChannelStateEvent` sentinel — a channel doesn't actually leave
recovery (`inRecovery = false`) until the mailbox polls that sentinel in
onRecoveredStateConsumed(). So there's a window where `trigger == NO_OP` while
a channel is still inRecovery: a barrier then gets no
`RecoveryCheckpointBarrier` inserted, and collectPreRecoveryBarrier spuriously
declines with `TASK_NOT_READY`. That's the bug behind the `TASK_NOT_READY` I
[flagged
earlier](https://github.com/apache/flink/pull/28554#discussion_r3486146963) —
it's masking this race. We should keep the trigger live until every channel has
consumed its end-of-recovery sentinel: either gate this swap on all channels
reaching inRecovery == false, or move it next to the existing "channel state
fully consumed → unblock upstream" point (the two events are adjacent), which
is simpler.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -881,58 +923,177 @@ private CompletableFuture<Void> restoreStateAndGates(
INITIALIZE_STATE_DURATION, initializeStateEndTs -
readOutputDataTs);
IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
- boolean checkpointingDuringRecoveryEnabled =
-
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration());
+ recoveryCompletionFuture =
+
CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(getJobConfiguration())
+ ? recoverChannelsWithCheckpointing(reader, inputGates)
+ : recoverChannelsWithoutCheckpointing(reader,
inputGates);
+
+ recoveryCompletionFuture.whenComplete((ign, throwable) ->
mailboxProcessor.suspend());
+ return recoveryCompletionFuture;
+ }
+
+ private CompletableFuture<Void> recoverChannelsWithCheckpointing(
+ SequentialChannelStateReader reader, IndexedInputGate[]
inputGates) {
+ if (inputGates.length == 0) {
+ // No input channels to recover (e.g. a source task). Complete
synchronously, exactly
+ // like recoverChannelsWithoutCheckpointing, so that the
recovery-completion suspend()
+ // in restoreStateAndGates is enqueued before the restore mailbox
loop runs the default
+ // action. The asynchronous chain below relies on mailbox mails
that only run while the
+ // restore loop is pumping; with no channel work to do it would
merely defer completion,
+ // letting a fast source finish and suspend the restore loop
before recovery completes
+ // -- tripping "Mailbox loop interrupted before recovery was
finished" in
+ // restoreInternal. There is nothing to checkpoint during recovery
here either, so the
+ // trigger goes straight to NO_OP.
+ recoveryCheckpointTrigger = RecoveryCheckpointTrigger.NO_OP;
+ return FutureUtils.completedVoidFuture();
+ }
+ return
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NOT_READY)
+ .thenApplyAsync(ign -> fetchChannelState(reader, inputGates),
channelIOExecutor)
+ .thenCompose(
+ state ->
+ requestPartitions(inputGates,
state.isPresent())
+ .thenApply(channels ->
buildDrainer(state, channels)))
+ .thenCompose(this::drainThroughCheckpointTrigger)
+ .thenRun(() ->
setRecoveryCheckpointTrigger(RecoveryCheckpointTrigger.NO_OP));
Review Comment:
Following is the unexpected exception from log:
```
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
org.apache.flink.runtime.checkpoint.CheckpointException:
RecoveryCheckpointBarrier for checkpoint 7 not yet present in channel
InputChannelInfo{gateIdx=0, inputChannelIdx=0} Failure reason: Checkpoint was
declined (tasks not ready)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.collectPreRecoveryBarrier(RemoteInputChannel.java:969)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkpointStarted(RemoteInputChannel.java:886)
at
org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:41)
at
org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.onCheckpointStartedForAllInputs(ChannelState.java:133)
at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:75)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:56)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:245)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:273)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:242)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:184)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:162)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.processPriorityEvents(CheckpointedInputGate.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:118)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:415)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:384)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:369)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1183)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1120)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:987)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:969)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:774)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]