WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1302368927


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception {
     public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
         final JobID jobID = jobGraph.getJobID();
         log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
-
-        try {
-            if (isInGloballyTerminalState(jobID)) {
-                log.warn(
-                        "Ignoring JobGraph submission '{}' ({}) because the 
job already reached a globally-terminal state (i.e. {}) in a previous 
execution.",
-                        jobGraph.getName(),
-                        jobID,
-                        Arrays.stream(JobStatus.values())
-                                .filter(JobStatus::isGloballyTerminalState)
-                                .map(JobStatus::name)
-                                .collect(Collectors.joining(", ")));
-                return FutureUtils.completedExceptionally(
-                        
DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
-            } else if (jobManagerRunnerRegistry.isRegistered(jobID)
-                    || submittedAndWaitingTerminationJobIDs.contains(jobID)) {
-                // job with the given jobID is not terminated, yet
-                return FutureUtils.completedExceptionally(
-                        DuplicateJobSubmissionException.of(jobID));
-            } else if (isPartialResourceConfigured(jobGraph)) {
-                return FutureUtils.completedExceptionally(
-                        new JobSubmissionException(
-                                jobID,
-                                "Currently jobs is not supported if parts of 
the vertices have "
-                                        + "resources configured. The 
limitation will be removed in future versions."));
-            } else {
-                return internalSubmitJob(jobGraph);
-            }
-        } catch (FlinkException e) {
-            return FutureUtils.completedExceptionally(e);
-        }
+        return isInGloballyTerminalState(jobID)
+                .thenCompose(
+                        isTerminated -> {
+                            if (isTerminated) {
+                                log.warn(
+                                        "Ignoring JobGraph submission '{}' 
({}) because the job already "
+                                                + "reached a globally-terminal 
state (i.e. {}) in a "
+                                                + "previous execution.",
+                                        jobGraph.getName(),
+                                        jobID,
+                                        Arrays.stream(JobStatus.values())
+                                                
.filter(JobStatus::isGloballyTerminalState)
+                                                .map(JobStatus::name)
+                                                .collect(Collectors.joining(", 
")));
+                                return FutureUtils.completedExceptionally(
+                                        
DuplicateJobSubmissionException.ofGloballyTerminated(
+                                                jobID));
+                            } else if 
(jobManagerRunnerRegistry.isRegistered(jobID)

Review Comment:
   Thanks for your detailed explanation! 😄 I've modified the `thenCompose` to  
`thenComposeAsync`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception {
     public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
         final JobID jobID = jobGraph.getJobID();
         log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
-
-        try {
-            if (isInGloballyTerminalState(jobID)) {
-                log.warn(
-                        "Ignoring JobGraph submission '{}' ({}) because the 
job already reached a globally-terminal state (i.e. {}) in a previous 
execution.",
-                        jobGraph.getName(),
-                        jobID,
-                        Arrays.stream(JobStatus.values())
-                                .filter(JobStatus::isGloballyTerminalState)
-                                .map(JobStatus::name)
-                                .collect(Collectors.joining(", ")));
-                return FutureUtils.completedExceptionally(
-                        
DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
-            } else if (jobManagerRunnerRegistry.isRegistered(jobID)
-                    || submittedAndWaitingTerminationJobIDs.contains(jobID)) {
-                // job with the given jobID is not terminated, yet
-                return FutureUtils.completedExceptionally(
-                        DuplicateJobSubmissionException.of(jobID));
-            } else if (isPartialResourceConfigured(jobGraph)) {
-                return FutureUtils.completedExceptionally(
-                        new JobSubmissionException(
-                                jobID,
-                                "Currently jobs is not supported if parts of 
the vertices have "
-                                        + "resources configured. The 
limitation will be removed in future versions."));
-            } else {
-                return internalSubmitJob(jobGraph);
-            }
-        } catch (FlinkException e) {
-            return FutureUtils.completedExceptionally(e);
-        }
+        return isInGloballyTerminalState(jobID)
+                .thenCompose(
+                        isTerminated -> {
+                            if (isTerminated) {
+                                log.warn(
+                                        "Ignoring JobGraph submission '{}' 
({}) because the job already "
+                                                + "reached a globally-terminal 
state (i.e. {}) in a "
+                                                + "previous execution.",
+                                        jobGraph.getName(),
+                                        jobID,
+                                        Arrays.stream(JobStatus.values())
+                                                
.filter(JobStatus::isGloballyTerminalState)
+                                                .map(JobStatus::name)
+                                                .collect(Collectors.joining(", 
")));
+                                return FutureUtils.completedExceptionally(
+                                        
DuplicateJobSubmissionException.ofGloballyTerminated(
+                                                jobID));
+                            } else if 
(jobManagerRunnerRegistry.isRegistered(jobID)

Review Comment:
   Thanks for your detailed explanation! 😄 I've modified the `thenCompose` to  
`thenComposeAsync`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to