kl0u commented on a change in pull request #12339:
URL: https://github.com/apache/flink/pull/12339#discussion_r430918068



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -71,31 +74,44 @@ public JobExecutionResult execute(StreamGraph streamGraph) 
throws Exception {
                JobClient jobClient = executeAsync(streamGraph);
 
                JobExecutionResult jobExecutionResult;
-               if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
-                       CompletableFuture<JobExecutionResult> 
jobExecutionResultFuture =
+               List<JobListener> jobListeners =  getJobListeners();

Review comment:
       I would suggest to put all the "old" `execute` in another method, 
something like `getJobExecutionResult()` and then, the actual `execute` can 
contain only the `try {} catch {}` logic. This will look sth like:
   
   ```
   private JobExecutionResult getJobExecutionResult(final JobClient jobClient) 
throws Exception {
                checkNotNull(jobClient);
   
                if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
                        CompletableFuture<JobExecutionResult> 
jobExecutionResultFuture =
                                        
jobClient.getJobExecutionResult(getUserClassloader());
   
                        if 
(getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
                                Thread shutdownHook = 
ShutdownHookUtil.addShutdownHook(
                                                () -> {
                                                        // wait a smidgen to 
allow the async request to go through before
                                                        // the jvm exits
                                                        
jobClient.cancel().get(1, TimeUnit.SECONDS);
                                                },
                                                
StreamContextEnvironment.class.getSimpleName(),
                                                LOG);
                                jobExecutionResultFuture.whenComplete((ignored, 
throwable) ->
                                                
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
StreamContextEnvironment.class.getSimpleName(), LOG));
                        }
   
                        JobExecutionResult jobExecutionResult = 
jobExecutionResultFuture.get();
                        System.out.println(jobExecutionResult);
                        return jobExecutionResult;
                } else {
                        return new 
DetachedJobExecutionResult(jobClient.getJobID());
                }
        }
   
        @Override
        public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
                JobClient jobClient = executeAsync(streamGraph);
   
                List<JobListener> jobListeners =  getJobListeners();
                try {
                        final JobExecutionResult  jobExecutionResult = 
getJobExecutionResult(jobClient);
                        jobListeners.forEach(jobListener -> 
jobListener.onJobExecuted(jobExecutionResult, null));
                        return jobExecutionResult;
   
                } catch (Throwable t) {
                        jobListeners.forEach(jobListener -> {
                                jobListener.onJobExecuted(null, 
ExceptionUtils.stripExecutionException(t));
                        });
                        ExceptionUtils.rethrowException(t);
   
                        // never reached, only make javac happy
                        return null;
                }
        }
   ```
   
   Please check that this is correct because I have not verified it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to