gaoyunhaii commented on a change in pull request #15820: URL: https://github.com/apache/flink/pull/15820#discussion_r623824848
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -530,6 +532,10 @@ protected Counter setupNumRecordsInCounter(StreamOperator streamOperator) { @Override public void restore() throws Exception { + runWithCleanUpOnFail(this::executeRestore); Review comment: Would it be better to make `restore` also `final`, and let the subclasses to override `executeRestore` instead ? Since the exception handling should be always required ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -609,42 +615,54 @@ private void ensureNotCanceled() { @Override public final void invoke() throws Exception { - try { - // Allow invoking method 'invoke' without having to call 'restore' before it. - if (!isRunning) { - LOG.debug("Restoring during invoke will be called."); - restore(); - } + runWithCleanUpOnFail(this::executeInvoke); + + cleanUpInvoke(); + } + + private void executeInvoke() throws Exception { + // Allow invoking method 'invoke' without having to call 'restore' before it. + if (!isRunning) { + LOG.debug("Restoring during invoke will be called."); + restore(); Review comment: Would it be better to call `executeRestore` here to avoid nested exception handling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org