[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361128#comment-16361128 ]
ASF GitHub Bot commented on FLINK-8533: --------------------------------------- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5427 Given the already non-trivial complexity of the `CheckpointCoordinator`, I am wondering if there is a way to do this without adding the `resetForNewExecution()` method. Adding another life cycle method (even if it is not exploited for other purposes currently) would need more involved tests and make future maintenance harder. Can we reset all hooks in the `restoreLatestCheckpointedState()` method? Would we miss any cases if we do only that? > Support MasterTriggerRestoreHook state reinitialization > ------------------------------------------------------- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.0 > Reporter: Eron Wright > Assignee: Eron Wright > Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)