eemario commented on code in PR #27686:
URL: https://github.com/apache/flink/pull/27686#discussion_r2871994461


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -334,9 +338,21 @@ protected Dispatcher(
 
         this.dispatcherBootstrapFactory = 
checkNotNull(dispatcherBootstrapFactory);
 
-        this.recoveredJobs = new HashSet<>(recoveredJobs);
+        for (ExecutionPlan executionPlan : recoveredJobs) {
+            final JobID jobId = executionPlan.getJobID();
+            final ApplicationID applicationId = 
executionPlan.getApplicationId().orElse(null);

Review Comment:
   This can only happen in test cases that submit jobs without an associated 
application. I’ll investigate whether there's a better way to handle them.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java:
##########
@@ -493,7 +524,9 @@ private static ArchivedExecutionGraph 
createSparseArchivedExecutionGraph(
                     jobStatus == JobStatus.FAILED || jobStatus == 
JobStatus.SUSPENDED);
             long failureTime = System.currentTimeMillis();
             failureInfo = new ErrorInfo(throwable, failureTime);
-            timestamps[jobStatus.ordinal()] = failureTime;
+            if (endTimestamp < 0) {

Review Comment:
   The intent of this change is to use the current timestamp as failureTime 
only when endTimestamp is invalid. The code and comments have been updated for 
clarity.



##########
flink-clients/src/test/java/org/apache/flink/client/deployment/application/PackagedProgramApplicationITCase.java:
##########
@@ -195,10 +194,8 @@ public JobResultStore getJobResultStore() {
             awaitClusterStopped(cluster);
         }
 
-        
FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
-                .as(
-                        "The job's main method shouldn't have been succeeded 
due to a DuplicateJobSubmissionException.")
-                
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
+        // submission should succeed

Review Comment:
   Updated comments to explain the change.



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -122,18 +145,33 @@ public CompletableFuture<JobClient> execute(
 
         // Skip resubmission if the job is recovered via HA.
         // When optJobId is present, the streamGraph's ID is deterministically 
derived from it. In
-        // this case, if the streamGraph's ID is in submittedJobIds, it means 
the job was submitted
-        // in a previous run and should not be resubmitted.
-        if (optJobId.isPresent() && 
submittedJobIds.contains(streamGraph.getJobID())) {
-            return getJobClientFuture(streamGraph.getJobID(), 
userCodeClassloader);
+        // this case, if the streamGraph's ID is in terminalJobIds or 
submittedJobIds, it means the
+        // job was submitted in a previous run and should not be resubmitted.
+        if (optJobId.isPresent()) {
+            final JobID actualJobId = streamGraph.getJobID();
+            if (terminalJobIds.contains(actualJobId)) {
+                LOG.info("Job {} reached a terminal state in a previous 
execution.", actualJobId);
+                return getJobClientFuture(actualJobId, userCodeClassloader);
+            }
+
+            if (recoveredJobIds.contains(actualJobId)) {
+                final Duration timeout = 
configuration.get(ClientOptions.CLIENT_TIMEOUT);
+                return dispatcherGateway
+                        .recoverJob(actualJobId, timeout)
+                        .thenCompose(
+                                ack -> {
+                                    LOG.info("Job {} is recovered 
successfully.", actualJobId);
+                                    return getJobClientFuture(actualJobId, 
userCodeClassloader);
+                                });
+            }
         }
 
         return submitAndGetJobClientFuture(pipeline, configuration, 
userCodeClassloader);
     }
 
     private CompletableFuture<JobClient> getJobClientFuture(
             final JobID jobId, final ClassLoader userCodeClassloader) {
-        LOG.info("Job {} was recovered successfully.", jobId);
+        submittedJobIds.add(jobId);

Review Comment:
   Updated the name.



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorServiceLoader.java:
##########
@@ -55,16 +59,21 @@ public class EmbeddedExecutorServiceLoader implements 
PipelineExecutorServiceLoa
      */
     public EmbeddedExecutorServiceLoader(
             final Collection<JobID> submittedJobIds,
+            final Collection<JobID> recoveredJobIds,

Review Comment:
   Updated variable names and comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to