[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361295#comment-16361295
 ] 

ASF GitHub Bot commented on FLINK-8533:
---------------------------------------

Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5427
  
    @StephanEwen thanks for taking a look, I agree with trying to avoid a new 
lifecycle method.
    
    The `initializeState` method on the hook interface gives the hook an 
unconditional initialization point. 
    In the Pravega case, we would move reader-group (RG) initialization from 
client to server, and always reset the RG to its initial conditions.   A 
subsequent restore may or may not occur.
    
    Assuming we like this approach, let's discuss how to make it work purely 
with `restoreLatestCheckpointedState`.   The `restoreLatestCheckpointedState` 
method is not called by the ExecutionGraph (EG) upon initial execution, which 
we would want to support the new `initializeState` method.   Would there be any 
issue with calling `restoreLatestCheckpointedState` on initial execution? Such 
symmetry would seem desirable.  
    
    **Existing approach**:
    ```
    === initial ===
    \-- JM.submitJob
    |   \-- EG.scheduleForExecution
    
    === restart===
    \-- RestartCallback.triggerFullRecovery
    |   \-- EG.restart
    |   |   \-- CC.restoreLatestCheckpointedState
    |   |   \-- EG.scheduleForExecution
    ```
    
    **Suggested approach**:
    ```
    === initial ===
    \-- JM.submitJob
    |   \-- EG.start  (** new method **)
    |   |   \-- CC.restoreLatestCheckpointedState
    |   |   |   \-- Hook.initializeState
    |   |   \-- EG.scheduleForExecution
    
    === restart===
    \-- RestartCallback.triggerFullRecovery
    |   \-- EG.restart
    |   |   \-- CC.restoreLatestCheckpointedState
    |   |   |   \-- Hook.initializeState
    |   |   \-- EG.scheduleForExecution
    ```


> Support MasterTriggerRestoreHook state reinitialization
> -------------------------------------------------------
>
>                 Key: FLINK-8533
>                 URL: https://issues.apache.org/jira/browse/FLINK-8533
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Eron Wright 
>            Assignee: Eron Wright 
>            Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to