gaoyunhaii commented on a change in pull request #15820:
URL: https://github.com/apache/flink/pull/15820#discussion_r623824848



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -530,6 +532,10 @@ protected Counter setupNumRecordsInCounter(StreamOperator 
streamOperator) {
 
     @Override
     public void restore() throws Exception {
+        runWithCleanUpOnFail(this::executeRestore);

Review comment:
       Would it be better to make `restore` also `final`, and let the 
subclasses to override `executeRestore` instead ? Since the exception handling 
should be always required

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -609,42 +615,54 @@ private void ensureNotCanceled() {
 
     @Override
     public final void invoke() throws Exception {
-        try {
-            // Allow invoking method 'invoke' without having to call 'restore' 
before it.
-            if (!isRunning) {
-                LOG.debug("Restoring during invoke will be called.");
-                restore();
-            }
+        runWithCleanUpOnFail(this::executeInvoke);
+
+        cleanUpInvoke();
+    }
+
+    private void executeInvoke() throws Exception {
+        // Allow invoking method 'invoke' without having to call 'restore' 
before it.
+        if (!isRunning) {
+            LOG.debug("Restoring during invoke will be called.");
+            restore();

Review comment:
       Would it be better to call `executeRestore` here to avoid nested 
exception handling?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to