hwanju commented on a change in pull request #14499:
URL: https://github.com/apache/flink/pull/14499#discussion_r555379990



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -527,38 +528,48 @@ protected void beforeInvoke() throws Exception {
 
        @Override
        public final void invoke() throws Exception {
+               // Monitor user codes from exiting JVM. This can be done in a 
finer-grained way like enclosing user callback
+               // functions individually (open, close, invoke, run, cancel, 
etc), but as Flink-managed exit is not performed
+               // and expected in this invoke function anyhow, we can monitor 
exiting JVM for entire scope. Cancel and
+               // checkpoint are handled in their own methods.
+               FlinkUserSecurityManager.monitorSystemExitForCurrentThread();
                try {
-                       beforeInvoke();
+                       try {
+                               beforeInvoke();
 
-                       // final check to exit early before starting to run
-                       if (canceled) {
-                               throw new CancelTaskException();
-                       }
+                               // final check to exit early before starting to 
run
+                               if (canceled) {
+                                       throw new CancelTaskException();
+                               }
 
-                       // let the task do its work
-                       runMailboxLoop();
+                               // let the task do its work
+                               runMailboxLoop();
 
-                       // if this left the run() method cleanly despite the 
fact that this was canceled,
-                       // make sure the "clean shutdown" is not attempted
-                       if (canceled) {
-                               throw new CancelTaskException();
-                       }
+                               // if this left the run() method cleanly 
despite the fact that this was canceled,
+                               // make sure the "clean shutdown" is not 
attempted
+                               if (canceled) {
+                                       throw new CancelTaskException();
+                               }
 
-                       afterInvoke();
-               }
-               catch (Throwable invokeException) {
-                       failing = !canceled;
-                       try {
-                               cleanUpInvoke();
-                       }
-                       // TODO: investigate why Throwable instead of Exception 
is used here.
-                       catch (Throwable cleanUpException) {
-                               Throwable throwable = 
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
-                               ExceptionUtils.rethrowException(throwable);
+                               afterInvoke();
+                       } catch (Throwable invokeException) {
+                               failing = !canceled;
+                               try {
+                                       cleanUpInvoke();
+                               }
+                               // TODO: investigate why Throwable instead of 
Exception is used here.
+                               catch (Throwable cleanUpException) {
+                                       Throwable throwable = 
ExceptionUtils.firstOrSuppressed(
+                                               cleanUpException,
+                                               invokeException);
+                                       
ExceptionUtils.rethrowException(throwable);
+                               }
+                               
ExceptionUtils.rethrowException(invokeException);
                        }
-                       ExceptionUtils.rethrowException(invokeException);
+                       cleanUpInvoke();
+               } finally {
+                       
FlinkUserSecurityManager.unmonitorSystemExitForCurrentThread();

Review comment:
       Moved the protection to `Task.doRun` to protect invoke() call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to