zhuzhurk commented on code in PR #25492:
URL: https://github.com/apache/flink/pull/25492#discussion_r1796834424


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStore.java:
##########
@@ -109,153 +110,163 @@ public void stop() throws Exception {
         synchronized (lock) {
             if (running) {
                 running = false;
-                LOG.info("Stopping DefaultJobGraphStore.");
+                LOG.info("Stopping DefaultExecutionPlanStore.");
                 Exception exception = null;
 
                 try {
-                    jobGraphStateHandleStore.releaseAll();
+                    executionPlanStateHandleStore.releaseAll();
                 } catch (Exception e) {
                     exception = e;
                 }
 
                 try {
-                    jobGraphStoreWatcher.stop();
+                    executionPlanStoreWatcher.stop();
                 } catch (Exception e) {
                     exception = ExceptionUtils.firstOrSuppressed(e, exception);
                 }
 
                 if (exception != null) {
                     throw new FlinkException(
-                            "Could not properly stop the 
DefaultJobGraphStore.", exception);
+                            "Could not properly stop the 
DefaultExecutionPlanStore.", exception);
                 }
             }
         }
     }
 
     @Nullable
     @Override
-    public JobGraph recoverJobGraph(JobID jobId) throws Exception {
+    public ExecutionPlan recoverExecutionPlan(JobID jobId) throws Exception {
         checkNotNull(jobId, "Job ID");
 
-        LOG.debug("Recovering job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+        LOG.debug("Recovering execution plan {} from {}.", jobId, 
executionPlanStateHandleStore);
 
-        final String name = jobGraphStoreUtil.jobIDToName(jobId);
+        final String name = executionPlanStoreUtil.jobIDToName(jobId);
 
         synchronized (lock) {
             verifyIsRunning();
 
             boolean success = false;
 
-            RetrievableStateHandle<JobGraph> jobGraphRetrievableStateHandle;
+            RetrievableStateHandle<ExecutionPlan> 
executionPlanRetrievableStateHandle;
 
             try {
                 try {
-                    jobGraphRetrievableStateHandle = 
jobGraphStateHandleStore.getAndLock(name);
+                    executionPlanRetrievableStateHandle =
+                            executionPlanStateHandleStore.getAndLock(name);
                 } catch (StateHandleStore.NotExistException ignored) {
                     success = true;
                     return null;
                 } catch (Exception e) {
                     throw new FlinkException(
-                            "Could not retrieve the submitted job graph state 
handle "
+                            "Could not retrieve the submitted execution plan 
state handle "
                                     + "for "
                                     + name
-                                    + " from the submitted job graph store.",
+                                    + " from the submitted execution plan 
store.",
                             e);
                 }
 
-                JobGraph jobGraph;
+                ExecutionPlan executionPlan;
                 try {
-                    jobGraph = jobGraphRetrievableStateHandle.retrieveState();
+                    executionPlan = 
executionPlanRetrievableStateHandle.retrieveState();
                 } catch (ClassNotFoundException cnfe) {
                     throw new FlinkException(
-                            "Could not retrieve submitted JobGraph from state 
handle under "
+                            "Could not retrieve submitted ExecutionPlan from 
state handle under "
                                     + name
                                     + ". This indicates that you are trying to 
recover from state written by an "
                                     + "older Flink version which is not 
compatible. Try cleaning the state handle store.",
                             cnfe);
                 } catch (IOException ioe) {
                     throw new FlinkException(
-                            "Could not retrieve submitted JobGraph from state 
handle under "
+                            "Could not retrieve submitted ExecutionPlan from 
state handle under "
                                     + name
                                     + ". This indicates that the retrieved 
state handle is broken. Try cleaning the state handle "
                                     + "store.",
                             ioe);
                 }
 
-                addedJobGraphs.add(jobGraph.getJobID());
+                addedExecutionPlans.add(executionPlan.getJobID());
 
-                LOG.info("Recovered {}.", jobGraph);
+                LOG.info("Recovered {}.", executionPlan);
 
                 success = true;
-                return jobGraph;
+                return executionPlan;
             } finally {
                 if (!success) {
-                    jobGraphStateHandleStore.release(name);
+                    executionPlanStateHandleStore.release(name);
                 }
             }
         }
     }
 
     @Override
-    public void putJobGraph(JobGraph jobGraph) throws Exception {
-        checkNotNull(jobGraph, "Job graph");
+    public void putExecutionPlan(ExecutionPlan executionPlan) throws Exception 
{
+        checkNotNull(executionPlan, "Execution Plan");
 
-        final JobID jobID = jobGraph.getJobID();
-        final String name = jobGraphStoreUtil.jobIDToName(jobID);
+        final JobID jobID = executionPlan.getJobID();
+        final String name = executionPlanStoreUtil.jobIDToName(jobID);
 
-        LOG.debug("Adding job graph {} to {}.", jobID, 
jobGraphStateHandleStore);
+        LOG.debug("Adding execution plan {} to {}.", jobID, 
executionPlanStateHandleStore);
 
         boolean success = false;
 
         while (!success) {
             synchronized (lock) {
                 verifyIsRunning();
 
-                final R currentVersion = jobGraphStateHandleStore.exists(name);
+                final R currentVersion = 
executionPlanStateHandleStore.exists(name);
 
                 if (!currentVersion.isExisting()) {
                     try {
-                        jobGraphStateHandleStore.addAndLock(name, jobGraph);
+                        executionPlanStateHandleStore.addAndLock(name, 
executionPlan);
 
-                        addedJobGraphs.add(jobID);
+                        addedExecutionPlans.add(jobID);
 
                         success = true;
                     } catch (StateHandleStore.AlreadyExistException ignored) {
-                        LOG.warn("{} already exists in {}.", jobGraph, 
jobGraphStateHandleStore);
+                        LOG.warn(
+                                "{} already exists in {}.",
+                                executionPlan,
+                                executionPlanStateHandleStore);
                     }
-                } else if (addedJobGraphs.contains(jobID)) {
+                } else if (addedExecutionPlans.contains(jobID)) {
                     try {
-                        jobGraphStateHandleStore.replace(name, currentVersion, 
jobGraph);
-                        LOG.info("Updated {} in {}.", jobGraph, 
getClass().getSimpleName());
+                        executionPlanStateHandleStore.replace(name, 
currentVersion, executionPlan);
+                        LOG.info("Updated {} in {}.", executionPlan, 
getClass().getSimpleName());
 
                         success = true;
                     } catch (StateHandleStore.NotExistException ignored) {
-                        LOG.warn("{} does not exists in {}.", jobGraph, 
jobGraphStateHandleStore);
+                        LOG.warn(
+                                "{} does not exists in {}.",
+                                executionPlan,
+                                executionPlanStateHandleStore);
                     }
                 } else {
                     throw new IllegalStateException(
-                            "Trying to update a graph you didn't "
-                                    + "#getAllSubmittedJobGraphs() or 
#putJobGraph() yourself before.");
+                            "Trying to update a execution plan you didn't "

Review Comment:
   a -> an



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStore.java:
##########
@@ -109,153 +110,163 @@ public void stop() throws Exception {
         synchronized (lock) {
             if (running) {
                 running = false;
-                LOG.info("Stopping DefaultJobGraphStore.");
+                LOG.info("Stopping DefaultExecutionPlanStore.");
                 Exception exception = null;
 
                 try {
-                    jobGraphStateHandleStore.releaseAll();
+                    executionPlanStateHandleStore.releaseAll();
                 } catch (Exception e) {
                     exception = e;
                 }
 
                 try {
-                    jobGraphStoreWatcher.stop();
+                    executionPlanStoreWatcher.stop();
                 } catch (Exception e) {
                     exception = ExceptionUtils.firstOrSuppressed(e, exception);
                 }
 
                 if (exception != null) {
                     throw new FlinkException(
-                            "Could not properly stop the 
DefaultJobGraphStore.", exception);
+                            "Could not properly stop the 
DefaultExecutionPlanStore.", exception);
                 }
             }
         }
     }
 
     @Nullable
     @Override
-    public JobGraph recoverJobGraph(JobID jobId) throws Exception {
+    public ExecutionPlan recoverExecutionPlan(JobID jobId) throws Exception {
         checkNotNull(jobId, "Job ID");
 
-        LOG.debug("Recovering job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+        LOG.debug("Recovering execution plan {} from {}.", jobId, 
executionPlanStateHandleStore);
 
-        final String name = jobGraphStoreUtil.jobIDToName(jobId);
+        final String name = executionPlanStoreUtil.jobIDToName(jobId);
 
         synchronized (lock) {
             verifyIsRunning();
 
             boolean success = false;
 
-            RetrievableStateHandle<JobGraph> jobGraphRetrievableStateHandle;
+            RetrievableStateHandle<ExecutionPlan> 
executionPlanRetrievableStateHandle;
 
             try {
                 try {
-                    jobGraphRetrievableStateHandle = 
jobGraphStateHandleStore.getAndLock(name);
+                    executionPlanRetrievableStateHandle =
+                            executionPlanStateHandleStore.getAndLock(name);
                 } catch (StateHandleStore.NotExistException ignored) {
                     success = true;
                     return null;
                 } catch (Exception e) {
                     throw new FlinkException(
-                            "Could not retrieve the submitted job graph state 
handle "
+                            "Could not retrieve the submitted execution plan 
state handle "
                                     + "for "
                                     + name
-                                    + " from the submitted job graph store.",
+                                    + " from the submitted execution plan 
store.",
                             e);
                 }
 
-                JobGraph jobGraph;
+                ExecutionPlan executionPlan;
                 try {
-                    jobGraph = jobGraphRetrievableStateHandle.retrieveState();
+                    executionPlan = 
executionPlanRetrievableStateHandle.retrieveState();
                 } catch (ClassNotFoundException cnfe) {
                     throw new FlinkException(
-                            "Could not retrieve submitted JobGraph from state 
handle under "
+                            "Could not retrieve submitted ExecutionPlan from 
state handle under "
                                     + name
                                     + ". This indicates that you are trying to 
recover from state written by an "
                                     + "older Flink version which is not 
compatible. Try cleaning the state handle store.",
                             cnfe);
                 } catch (IOException ioe) {
                     throw new FlinkException(
-                            "Could not retrieve submitted JobGraph from state 
handle under "
+                            "Could not retrieve submitted ExecutionPlan from 
state handle under "
                                     + name
                                     + ". This indicates that the retrieved 
state handle is broken. Try cleaning the state handle "
                                     + "store.",
                             ioe);
                 }
 
-                addedJobGraphs.add(jobGraph.getJobID());
+                addedExecutionPlans.add(executionPlan.getJobID());
 
-                LOG.info("Recovered {}.", jobGraph);
+                LOG.info("Recovered {}.", executionPlan);
 
                 success = true;
-                return jobGraph;
+                return executionPlan;
             } finally {
                 if (!success) {
-                    jobGraphStateHandleStore.release(name);
+                    executionPlanStateHandleStore.release(name);
                 }
             }
         }
     }
 
     @Override
-    public void putJobGraph(JobGraph jobGraph) throws Exception {
-        checkNotNull(jobGraph, "Job graph");
+    public void putExecutionPlan(ExecutionPlan executionPlan) throws Exception 
{
+        checkNotNull(executionPlan, "Execution Plan");
 
-        final JobID jobID = jobGraph.getJobID();
-        final String name = jobGraphStoreUtil.jobIDToName(jobID);
+        final JobID jobID = executionPlan.getJobID();
+        final String name = executionPlanStoreUtil.jobIDToName(jobID);
 
-        LOG.debug("Adding job graph {} to {}.", jobID, 
jobGraphStateHandleStore);
+        LOG.debug("Adding execution plan {} to {}.", jobID, 
executionPlanStateHandleStore);
 
         boolean success = false;
 
         while (!success) {
             synchronized (lock) {
                 verifyIsRunning();
 
-                final R currentVersion = jobGraphStateHandleStore.exists(name);
+                final R currentVersion = 
executionPlanStateHandleStore.exists(name);
 
                 if (!currentVersion.isExisting()) {
                     try {
-                        jobGraphStateHandleStore.addAndLock(name, jobGraph);
+                        executionPlanStateHandleStore.addAndLock(name, 
executionPlan);
 
-                        addedJobGraphs.add(jobID);
+                        addedExecutionPlans.add(jobID);
 
                         success = true;
                     } catch (StateHandleStore.AlreadyExistException ignored) {
-                        LOG.warn("{} already exists in {}.", jobGraph, 
jobGraphStateHandleStore);
+                        LOG.warn(
+                                "{} already exists in {}.",
+                                executionPlan,
+                                executionPlanStateHandleStore);
                     }
-                } else if (addedJobGraphs.contains(jobID)) {
+                } else if (addedExecutionPlans.contains(jobID)) {
                     try {
-                        jobGraphStateHandleStore.replace(name, currentVersion, 
jobGraph);
-                        LOG.info("Updated {} in {}.", jobGraph, 
getClass().getSimpleName());
+                        executionPlanStateHandleStore.replace(name, 
currentVersion, executionPlan);
+                        LOG.info("Updated {} in {}.", executionPlan, 
getClass().getSimpleName());
 
                         success = true;
                     } catch (StateHandleStore.NotExistException ignored) {
-                        LOG.warn("{} does not exists in {}.", jobGraph, 
jobGraphStateHandleStore);
+                        LOG.warn(
+                                "{} does not exists in {}.",
+                                executionPlan,
+                                executionPlanStateHandleStore);
                     }
                 } else {
                     throw new IllegalStateException(
-                            "Trying to update a graph you didn't "
-                                    + "#getAllSubmittedJobGraphs() or 
#putJobGraph() yourself before.");
+                            "Trying to update a execution plan you didn't "
+                                    + "#getAllSubmittedExecutionPlans() or 
#putExecutionPlan() yourself before.");
                 }
             }
         }
 
-        LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
+        LOG.info("Added {} to {}.", executionPlan, 
executionPlanStateHandleStore);
     }
 
     @Override
     public void putJobResourceRequirements(
             JobID jobId, JobResourceRequirements jobResourceRequirements) 
throws Exception {
         synchronized (lock) {
-            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
-            if (jobGraph == null) {
+            @Nullable final ExecutionPlan executionPlan = 
recoverExecutionPlan(jobId);
+            if (executionPlan == null) {
                 throw new NoSuchElementException(
                         String.format(
-                                "JobGraph for job [%s] was not found in 
JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                "ExecutionPlan for job [%s] was not found in 
ExecutionPlanStore and is needed for attaching JobResourceRequirements.",
                                 jobId));
             }
-            JobResourceRequirements.writeToJobGraph(jobGraph, 
jobResourceRequirements);
-            putJobGraph(jobGraph);
+
+            checkState(executionPlan instanceof JobGraph);

Review Comment:
   Is this method expected to be only invoked on a `JobGraph`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to