[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361295#comment-16361295 ]
ASF GitHub Bot commented on FLINK-8533: --------------------------------------- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @StephanEwen thanks for taking a look, I agree with trying to avoid a new lifecycle method. The `initializeState` method on the hook interface gives the hook an unconditional initialization point. In the Pravega case, we would move reader-group (RG) initialization from client to server, and always reset the RG to its initial conditions. A subsequent restore may or may not occur. Assuming we like this approach, let's discuss how to make it work purely with `restoreLatestCheckpointedState`. The `restoreLatestCheckpointedState` method is not called by the ExecutionGraph (EG) upon initial execution, which we would want to support the new `initializeState` method. Would there be any issue with calling `restoreLatestCheckpointedState` on initial execution? Such symmetry would seem desirable. **Existing approach**: ``` === initial === \-- JM.submitJob | \-- EG.scheduleForExecution === restart=== \-- RestartCallback.triggerFullRecovery | \-- EG.restart | | \-- CC.restoreLatestCheckpointedState | | \-- EG.scheduleForExecution ``` **Suggested approach**: ``` === initial === \-- JM.submitJob | \-- EG.start (** new method **) | | \-- CC.restoreLatestCheckpointedState | | | \-- Hook.initializeState | | \-- EG.scheduleForExecution === restart=== \-- RestartCallback.triggerFullRecovery | \-- EG.restart | | \-- CC.restoreLatestCheckpointedState | | | \-- Hook.initializeState | | \-- EG.scheduleForExecution ``` > Support MasterTriggerRestoreHook state reinitialization > ------------------------------------------------------- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.0 > Reporter: Eron Wright > Assignee: Eron Wright > Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)