JunRuiLee commented on code in PR #25492: URL: https://github.com/apache/flink/pull/25492#discussion_r1797532597
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStore.java: ########## @@ -109,153 +110,163 @@ public void stop() throws Exception { synchronized (lock) { if (running) { running = false; - LOG.info("Stopping DefaultJobGraphStore."); + LOG.info("Stopping DefaultExecutionPlanStore."); Exception exception = null; try { - jobGraphStateHandleStore.releaseAll(); + executionPlanStateHandleStore.releaseAll(); } catch (Exception e) { exception = e; } try { - jobGraphStoreWatcher.stop(); + executionPlanStoreWatcher.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (exception != null) { throw new FlinkException( - "Could not properly stop the DefaultJobGraphStore.", exception); + "Could not properly stop the DefaultExecutionPlanStore.", exception); } } } } @Nullable @Override - public JobGraph recoverJobGraph(JobID jobId) throws Exception { + public ExecutionPlan recoverExecutionPlan(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); - LOG.debug("Recovering job graph {} from {}.", jobId, jobGraphStateHandleStore); + LOG.debug("Recovering execution plan {} from {}.", jobId, executionPlanStateHandleStore); - final String name = jobGraphStoreUtil.jobIDToName(jobId); + final String name = executionPlanStoreUtil.jobIDToName(jobId); synchronized (lock) { verifyIsRunning(); boolean success = false; - RetrievableStateHandle<JobGraph> jobGraphRetrievableStateHandle; + RetrievableStateHandle<ExecutionPlan> executionPlanRetrievableStateHandle; try { try { - jobGraphRetrievableStateHandle = jobGraphStateHandleStore.getAndLock(name); + executionPlanRetrievableStateHandle = + executionPlanStateHandleStore.getAndLock(name); } catch (StateHandleStore.NotExistException ignored) { success = true; return null; } catch (Exception e) { throw new FlinkException( - "Could not retrieve the submitted job graph state handle " + "Could not retrieve the submitted execution plan state handle " + "for " + name - + " from the submitted job graph store.", + + " from the submitted execution plan store.", e); } - JobGraph jobGraph; + ExecutionPlan executionPlan; try { - jobGraph = jobGraphRetrievableStateHandle.retrieveState(); + executionPlan = executionPlanRetrievableStateHandle.retrieveState(); } catch (ClassNotFoundException cnfe) { throw new FlinkException( - "Could not retrieve submitted JobGraph from state handle under " + "Could not retrieve submitted ExecutionPlan from state handle under " + name + ". This indicates that you are trying to recover from state written by an " + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); } catch (IOException ioe) { throw new FlinkException( - "Could not retrieve submitted JobGraph from state handle under " + "Could not retrieve submitted ExecutionPlan from state handle under " + name + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle " + "store.", ioe); } - addedJobGraphs.add(jobGraph.getJobID()); + addedExecutionPlans.add(executionPlan.getJobID()); - LOG.info("Recovered {}.", jobGraph); + LOG.info("Recovered {}.", executionPlan); success = true; - return jobGraph; + return executionPlan; } finally { if (!success) { - jobGraphStateHandleStore.release(name); + executionPlanStateHandleStore.release(name); } } } } @Override - public void putJobGraph(JobGraph jobGraph) throws Exception { - checkNotNull(jobGraph, "Job graph"); + public void putExecutionPlan(ExecutionPlan executionPlan) throws Exception { + checkNotNull(executionPlan, "Execution Plan"); - final JobID jobID = jobGraph.getJobID(); - final String name = jobGraphStoreUtil.jobIDToName(jobID); + final JobID jobID = executionPlan.getJobID(); + final String name = executionPlanStoreUtil.jobIDToName(jobID); - LOG.debug("Adding job graph {} to {}.", jobID, jobGraphStateHandleStore); + LOG.debug("Adding execution plan {} to {}.", jobID, executionPlanStateHandleStore); boolean success = false; while (!success) { synchronized (lock) { verifyIsRunning(); - final R currentVersion = jobGraphStateHandleStore.exists(name); + final R currentVersion = executionPlanStateHandleStore.exists(name); if (!currentVersion.isExisting()) { try { - jobGraphStateHandleStore.addAndLock(name, jobGraph); + executionPlanStateHandleStore.addAndLock(name, executionPlan); - addedJobGraphs.add(jobID); + addedExecutionPlans.add(jobID); success = true; } catch (StateHandleStore.AlreadyExistException ignored) { - LOG.warn("{} already exists in {}.", jobGraph, jobGraphStateHandleStore); + LOG.warn( + "{} already exists in {}.", + executionPlan, + executionPlanStateHandleStore); } - } else if (addedJobGraphs.contains(jobID)) { + } else if (addedExecutionPlans.contains(jobID)) { try { - jobGraphStateHandleStore.replace(name, currentVersion, jobGraph); - LOG.info("Updated {} in {}.", jobGraph, getClass().getSimpleName()); + executionPlanStateHandleStore.replace(name, currentVersion, executionPlan); + LOG.info("Updated {} in {}.", executionPlan, getClass().getSimpleName()); success = true; } catch (StateHandleStore.NotExistException ignored) { - LOG.warn("{} does not exists in {}.", jobGraph, jobGraphStateHandleStore); + LOG.warn( + "{} does not exists in {}.", + executionPlan, + executionPlanStateHandleStore); } } else { throw new IllegalStateException( - "Trying to update a graph you didn't " - + "#getAllSubmittedJobGraphs() or #putJobGraph() yourself before."); + "Trying to update a execution plan you didn't " + + "#getAllSubmittedExecutionPlans() or #putExecutionPlan() yourself before."); } } } - LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore); + LOG.info("Added {} to {}.", executionPlan, executionPlanStateHandleStore); } @Override public void putJobResourceRequirements( JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception { synchronized (lock) { - @Nullable final JobGraph jobGraph = recoverJobGraph(jobId); - if (jobGraph == null) { + @Nullable final ExecutionPlan executionPlan = recoverExecutionPlan(jobId); + if (executionPlan == null) { throw new NoSuchElementException( String.format( - "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.", + "ExecutionPlan for job [%s] was not found in ExecutionPlanStore and is needed for attaching JobResourceRequirements.", jobId)); } - JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements); - putJobGraph(jobGraph); + + checkState(executionPlan instanceof JobGraph); Review Comment: Yes, you're right. I have corrected it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org