[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16454651#comment-16454651 ]
ASF GitHub Bot commented on FLINK-8533: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5427#discussion_r184477785 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1009,6 +1013,11 @@ public boolean restoreLatestCheckpointedState( LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry); + // Instruct the master hooks to initialize their state (unconditionally) + LOG.debug("Initializing the master hooks."); --- End diff -- Can you elaborate a bit why this initialization is happening in all cases? An alternative would be to have a `reset()` method or so on the master hook that is called further below, in the `if (latest == null)` code block. Initializing the state seems a tad bit unituitive to me here - I somehow assume that an init function is called once, while this one here is called on every restore. > Support MasterTriggerRestoreHook state reinitialization > ------------------------------------------------------- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.0 > Reporter: Eron Wright > Assignee: Eron Wright > Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)