zentol commented on a change in pull request #14950: URL: https://github.com/apache/flink/pull/14950#discussion_r581861367
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ########## @@ -1021,203 +220,20 @@ public ArchivedExecutionConfig getArchivedExecutionConfig() { * * @return Termination future of this {@link ExecutionGraph}. */ - public CompletableFuture<JobStatus> getTerminationFuture() { - return terminationFuture; - } + CompletableFuture<JobStatus> getTerminationFuture(); @VisibleForTesting - public JobStatus waitUntilTerminal() throws InterruptedException { - try { - return terminationFuture.get(); - } catch (ExecutionException e) { - // this should never happen - // it would be a bug, so we don't expect this to be handled and throw - // an unchecked exception here - throw new RuntimeException(e); - } - } - - // ------------------------------------------------------------------------ - // State Transitions - // ------------------------------------------------------------------------ - - public boolean transitionState(JobStatus current, JobStatus newState) { - return transitionState(current, newState, null); - } - - private void transitionState(JobStatus newState, Throwable error) { - transitionState(state, newState, error); - } + JobStatus waitUntilTerminal() throws InterruptedException; - private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) { - assertRunningInJobMasterMainThread(); - // consistency check - if (current.isTerminalState()) { - String message = "Job is trying to leave terminal state " + current; - LOG.error(message); - throw new IllegalStateException(message); - } + boolean transitionState(JobStatus current, JobStatus newState); - // now do the actual state transition - if (state == current) { - state = newState; - LOG.info( - "Job {} ({}) switched from state {} to {}.", - getJobName(), - getJobID(), - current, - newState, - error); + void incrementRestarts(); - stateTimestamps[newState.ordinal()] = System.currentTimeMillis(); - notifyJobStatusChange(newState, error); - return true; - } else { - return false; - } - } + void initFailureCause(Throwable t); - public void incrementRestarts() { - numberOfRestartsCounter.inc(); - } + void vertexFinished(); Review comment: To add on to that, in the long-term interfaces for all Execution* classes would be pretty sick because we could re-implement the EG as we see fit, whereas an interface between the EG and vertices is probably not that beneficial. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org