eemario commented on code in PR #27686:
URL: https://github.com/apache/flink/pull/27686#discussion_r2871993722
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java:
##########
@@ -68,13 +69,22 @@ public class JobResult implements Serializable {
/** Stores the cause of the job failure, or {@code null} if the job
finished successfully. */
@Nullable private final SerializedThrowable serializedThrowable;
+ @Nullable private final ApplicationID applicationId;
Review Comment:
Updated rest_v1_dispatcher.yml to reflect the change.
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java:
##########
@@ -573,12 +584,12 @@ private void runApplicationEntryPoint(
getApplicationId(),
getAllRecoveredJobInfos());
- if (applicationJobIds.isEmpty()) {
+ if (submittedJobIds.isEmpty()) {
Review Comment:
Yes, the list contains all jobs. The variable has beed renamed back to
`applicationJobIds` and explanatory comments have been added.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -685,12 +739,35 @@ public void notifyApplicationStatusChange(
"Application %s does not exist.",
applicationId);
checkState(
- !applicationArchivingFutures.containsKey(applicationId),
- "The application (" + applicationId + ") has already been
archived.");
+ !applicationTerminationFutures.get(applicationId).isDone(),
+ "The application (" + applicationId + ") has already
terminated.");
+
+ AbstractApplication application = applications.get(applicationId);
+ Set<JobID> remainingRecoveredJobIds =
Review Comment:
Updated the variable name.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -664,11 +713,16 @@ private CompletableFuture<Acknowledge>
internalSubmitApplication(
log.info("Submitting application '{}' ({}).", application.getName(),
applicationId);
applications.put(applicationId, application);
- Set<JobID> jobs = recoveredApplicationJobIds.remove(applicationId);
- if (jobs != null) {
- jobs.forEach(application::addJob);
- }
application.registerStatusListener(this);
+ applicationTerminationFutures.put(applicationId, new
CompletableFuture<>());
+
+ // cleanup dirty job results for the application
Review Comment:
Perform cleanup during application submission to ensure that the JobClient
is available when the application execution skips resubmitting
already-terminated jobs and retrieve the JobClient directly. The comments have
been updated for clarity.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -624,6 +648,31 @@ public CompletableFuture<Acknowledge>
submitJob(ExecutionPlan executionPlan, Dur
getMainThreadExecutor(jobID));
}
+ @Override
+ public CompletableFuture<Acknowledge> recoverJob(JobID jobId, Duration
timeout) {
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+ log.info("Received job recovery request for job {}.", jobId);
+ }
+ final ExecutionPlan executionPlan = recoveredJobs.remove(jobId);
+ if (executionPlan == null) {
+ return FutureUtils.completedExceptionally(
+ new JobSubmissionException(jobId, "Cannot find the
recovered job."));
+ }
+
+ final ApplicationID applicationId =
executionPlan.getApplicationId().orElse(null);
+ checkState(recoveredJobIdsByApplicationId.containsKey(applicationId));
+
+ runRecoveredJob(executionPlan, false);
+
+ Set<JobID> jobIds = recoveredJobIdsByApplicationId.get(applicationId);
Review Comment:
Updated the variable names.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]