kl0u commented on a change in pull request #12339: URL: https://github.com/apache/flink/pull/12339#discussion_r430918068
########## File path: flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java ########## @@ -71,31 +74,44 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { JobClient jobClient = executeAsync(streamGraph); JobExecutionResult jobExecutionResult; - if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) { - CompletableFuture<JobExecutionResult> jobExecutionResultFuture = + List<JobListener> jobListeners = getJobListeners(); Review comment: I would suggest to put all the "old" `execute` in another method, something like `getJobExecutionResult()` and then, the actual `execute` can contain only the `try {} catch {}` logic. This will look sth like: ``` private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception { checkNotNull(jobClient); if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) { CompletableFuture<JobExecutionResult> jobExecutionResultFuture = jobClient.getJobExecutionResult(getUserClassloader()); if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { Thread shutdownHook = ShutdownHookUtil.addShutdownHook( () -> { // wait a smidgen to allow the async request to go through before // the jvm exits jobClient.cancel().get(1, TimeUnit.SECONDS); }, StreamContextEnvironment.class.getSimpleName(), LOG); jobExecutionResultFuture.whenComplete((ignored, throwable) -> ShutdownHookUtil.removeShutdownHook(shutdownHook, StreamContextEnvironment.class.getSimpleName(), LOG)); } JobExecutionResult jobExecutionResult = jobExecutionResultFuture.get(); System.out.println(jobExecutionResult); return jobExecutionResult; } else { return new DetachedJobExecutionResult(jobClient.getJobID()); } } @Override public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { JobClient jobClient = executeAsync(streamGraph); List<JobListener> jobListeners = getJobListeners(); try { final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient); jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null)); return jobExecutionResult; } catch (Throwable t) { jobListeners.forEach(jobListener -> { jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)); }); ExceptionUtils.rethrowException(t); // never reached, only make javac happy return null; } } ``` Please check that this is correct because I have not verified it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org