[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16454652#comment-16454652 ]
ASF GitHub Bot commented on FLINK-8533: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5427#discussion_r184481704 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -291,6 +341,34 @@ else if (!allowUnmatchedState) { this.userClassLoader = Preconditions.checkNotNull(userClassLoader); } + @Override + public void initializeState(HookInitializationContext context) throws Exception { + final Thread thread = Thread.currentThread(); --- End diff -- We could use a utility like: ```java public static void withContextClassLoader(ClassLoader cl, Runnable r) { final Thread thread = Thread.currentThread(); final ClassLoader originalClassLoader = thread.getContextClassLoader(); try { thread.setContextClassLoader(userClassLoader); r.run(); } finally { thread.setContextClassLoader(originalClassLoader); } } withContextClassLoader(userClassLoader, () -> { hook.initializeState(context); }); ``` How about adding that to `org.apache.flink.util.LambdaUtil`? > Support MasterTriggerRestoreHook state reinitialization > ------------------------------------------------------- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.0 > Reporter: Eron Wright > Assignee: Eron Wright > Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)