WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1302585103


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception {
     public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
         final JobID jobID = jobGraph.getJobID();
         log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
-
-        try {
-            if (isInGloballyTerminalState(jobID)) {
-                log.warn(
-                        "Ignoring JobGraph submission '{}' ({}) because the 
job already reached a globally-terminal state (i.e. {}) in a previous 
execution.",
-                        jobGraph.getName(),
-                        jobID,
-                        Arrays.stream(JobStatus.values())
-                                .filter(JobStatus::isGloballyTerminalState)
-                                .map(JobStatus::name)
-                                .collect(Collectors.joining(", ")));
-                return FutureUtils.completedExceptionally(
-                        
DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
-            } else if (jobManagerRunnerRegistry.isRegistered(jobID)
-                    || submittedAndWaitingTerminationJobIDs.contains(jobID)) {
-                // job with the given jobID is not terminated, yet
-                return FutureUtils.completedExceptionally(
-                        DuplicateJobSubmissionException.of(jobID));
-            } else if (isPartialResourceConfigured(jobGraph)) {
-                return FutureUtils.completedExceptionally(
-                        new JobSubmissionException(
-                                jobID,
-                                "Currently jobs is not supported if parts of 
the vertices have "
-                                        + "resources configured. The 
limitation will be removed in future versions."));
-            } else {
-                return internalSubmitJob(jobGraph);
-            }
-        } catch (FlinkException e) {
-            return FutureUtils.completedExceptionally(e);
-        }
+        return isInGloballyTerminalState(jobID)
+                .thenCompose(
+                        isTerminated -> {
+                            if (isTerminated) {
+                                log.warn(
+                                        "Ignoring JobGraph submission '{}' 
({}) because the job already "
+                                                + "reached a globally-terminal 
state (i.e. {}) in a "
+                                                + "previous execution.",
+                                        jobGraph.getName(),
+                                        jobID,
+                                        Arrays.stream(JobStatus.values())
+                                                
.filter(JobStatus::isGloballyTerminalState)
+                                                .map(JobStatus::name)
+                                                .collect(Collectors.joining(", 
")));
+                                return FutureUtils.completedExceptionally(
+                                        
DuplicateJobSubmissionException.ofGloballyTerminated(
+                                                jobID));
+                            } else if 
(jobManagerRunnerRegistry.isRegistered(jobID)

Review Comment:
   😼 I have added the changes in the 
[comparison](https://github.com/apache/flink/compare/5a7329d02ce51f53bfd4ab8c2f6e30b5271c539f..ee05e69244ad3254c05a567e37e6be7d85ce0881).
 Really sorry for this.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception {
     public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
         final JobID jobID = jobGraph.getJobID();
         log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
-
-        try {
-            if (isInGloballyTerminalState(jobID)) {
-                log.warn(
-                        "Ignoring JobGraph submission '{}' ({}) because the 
job already reached a globally-terminal state (i.e. {}) in a previous 
execution.",
-                        jobGraph.getName(),
-                        jobID,
-                        Arrays.stream(JobStatus.values())
-                                .filter(JobStatus::isGloballyTerminalState)
-                                .map(JobStatus::name)
-                                .collect(Collectors.joining(", ")));
-                return FutureUtils.completedExceptionally(
-                        
DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
-            } else if (jobManagerRunnerRegistry.isRegistered(jobID)
-                    || submittedAndWaitingTerminationJobIDs.contains(jobID)) {
-                // job with the given jobID is not terminated, yet
-                return FutureUtils.completedExceptionally(
-                        DuplicateJobSubmissionException.of(jobID));
-            } else if (isPartialResourceConfigured(jobGraph)) {
-                return FutureUtils.completedExceptionally(
-                        new JobSubmissionException(
-                                jobID,
-                                "Currently jobs is not supported if parts of 
the vertices have "
-                                        + "resources configured. The 
limitation will be removed in future versions."));
-            } else {
-                return internalSubmitJob(jobGraph);
-            }
-        } catch (FlinkException e) {
-            return FutureUtils.completedExceptionally(e);
-        }
+        return isInGloballyTerminalState(jobID)
+                .thenCompose(
+                        isTerminated -> {
+                            if (isTerminated) {
+                                log.warn(
+                                        "Ignoring JobGraph submission '{}' 
({}) because the job already "
+                                                + "reached a globally-terminal 
state (i.e. {}) in a "
+                                                + "previous execution.",
+                                        jobGraph.getName(),
+                                        jobID,
+                                        Arrays.stream(JobStatus.values())
+                                                
.filter(JobStatus::isGloballyTerminalState)
+                                                .map(JobStatus::name)
+                                                .collect(Collectors.joining(", 
")));
+                                return FutureUtils.completedExceptionally(
+                                        
DuplicateJobSubmissionException.ofGloballyTerminated(
+                                                jobID));
+                            } else if 
(jobManagerRunnerRegistry.isRegistered(jobID)

Review Comment:
   😼 I have added the changes in the 
[comparison](https://github.com/apache/flink/compare/5a7329d02ce51f53bfd4ab8c2f6e30b5271c539f..ee05e69244ad3254c05a567e37e6be7d85ce0881).
 Really sorry for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to