WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1302368927
########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ########## @@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception { public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { final JobID jobID = jobGraph.getJobID(); log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobID); - - try { - if (isInGloballyTerminalState(jobID)) { - log.warn( - "Ignoring JobGraph submission '{}' ({}) because the job already reached a globally-terminal state (i.e. {}) in a previous execution.", - jobGraph.getName(), - jobID, - Arrays.stream(JobStatus.values()) - .filter(JobStatus::isGloballyTerminalState) - .map(JobStatus::name) - .collect(Collectors.joining(", "))); - return FutureUtils.completedExceptionally( - DuplicateJobSubmissionException.ofGloballyTerminated(jobID)); - } else if (jobManagerRunnerRegistry.isRegistered(jobID) - || submittedAndWaitingTerminationJobIDs.contains(jobID)) { - // job with the given jobID is not terminated, yet - return FutureUtils.completedExceptionally( - DuplicateJobSubmissionException.of(jobID)); - } else if (isPartialResourceConfigured(jobGraph)) { - return FutureUtils.completedExceptionally( - new JobSubmissionException( - jobID, - "Currently jobs is not supported if parts of the vertices have " - + "resources configured. The limitation will be removed in future versions.")); - } else { - return internalSubmitJob(jobGraph); - } - } catch (FlinkException e) { - return FutureUtils.completedExceptionally(e); - } + return isInGloballyTerminalState(jobID) + .thenCompose( + isTerminated -> { + if (isTerminated) { + log.warn( + "Ignoring JobGraph submission '{}' ({}) because the job already " + + "reached a globally-terminal state (i.e. {}) in a " + + "previous execution.", + jobGraph.getName(), + jobID, + Arrays.stream(JobStatus.values()) + .filter(JobStatus::isGloballyTerminalState) + .map(JobStatus::name) + .collect(Collectors.joining(", "))); + return FutureUtils.completedExceptionally( + DuplicateJobSubmissionException.ofGloballyTerminated( + jobID)); + } else if (jobManagerRunnerRegistry.isRegistered(jobID) Review Comment: Thanks for your detailed explanation! 😄 I've modified the `thenCompose` to `thenComposeAsync`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ########## @@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception { public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { final JobID jobID = jobGraph.getJobID(); log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobID); - - try { - if (isInGloballyTerminalState(jobID)) { - log.warn( - "Ignoring JobGraph submission '{}' ({}) because the job already reached a globally-terminal state (i.e. {}) in a previous execution.", - jobGraph.getName(), - jobID, - Arrays.stream(JobStatus.values()) - .filter(JobStatus::isGloballyTerminalState) - .map(JobStatus::name) - .collect(Collectors.joining(", "))); - return FutureUtils.completedExceptionally( - DuplicateJobSubmissionException.ofGloballyTerminated(jobID)); - } else if (jobManagerRunnerRegistry.isRegistered(jobID) - || submittedAndWaitingTerminationJobIDs.contains(jobID)) { - // job with the given jobID is not terminated, yet - return FutureUtils.completedExceptionally( - DuplicateJobSubmissionException.of(jobID)); - } else if (isPartialResourceConfigured(jobGraph)) { - return FutureUtils.completedExceptionally( - new JobSubmissionException( - jobID, - "Currently jobs is not supported if parts of the vertices have " - + "resources configured. The limitation will be removed in future versions.")); - } else { - return internalSubmitJob(jobGraph); - } - } catch (FlinkException e) { - return FutureUtils.completedExceptionally(e); - } + return isInGloballyTerminalState(jobID) + .thenCompose( + isTerminated -> { + if (isTerminated) { + log.warn( + "Ignoring JobGraph submission '{}' ({}) because the job already " + + "reached a globally-terminal state (i.e. {}) in a " + + "previous execution.", + jobGraph.getName(), + jobID, + Arrays.stream(JobStatus.values()) + .filter(JobStatus::isGloballyTerminalState) + .map(JobStatus::name) + .collect(Collectors.joining(", "))); + return FutureUtils.completedExceptionally( + DuplicateJobSubmissionException.ofGloballyTerminated( + jobID)); + } else if (jobManagerRunnerRegistry.isRegistered(jobID) Review Comment: Thanks for your detailed explanation! 😄 I've modified the `thenCompose` to `thenComposeAsync`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org