rmetzger commented on a change in pull request #14950: URL: https://github.com/apache/flink/pull/14950#discussion_r578994044
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ########## @@ -1021,203 +220,20 @@ public ArchivedExecutionConfig getArchivedExecutionConfig() { * * @return Termination future of this {@link ExecutionGraph}. */ - public CompletableFuture<JobStatus> getTerminationFuture() { - return terminationFuture; - } + CompletableFuture<JobStatus> getTerminationFuture(); @VisibleForTesting - public JobStatus waitUntilTerminal() throws InterruptedException { - try { - return terminationFuture.get(); - } catch (ExecutionException e) { - // this should never happen - // it would be a bug, so we don't expect this to be handled and throw - // an unchecked exception here - throw new RuntimeException(e); - } - } - - // ------------------------------------------------------------------------ - // State Transitions - // ------------------------------------------------------------------------ - - public boolean transitionState(JobStatus current, JobStatus newState) { - return transitionState(current, newState, null); - } - - private void transitionState(JobStatus newState, Throwable error) { - transitionState(state, newState, error); - } + JobStatus waitUntilTerminal() throws InterruptedException; - private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) { - assertRunningInJobMasterMainThread(); - // consistency check - if (current.isTerminalState()) { - String message = "Job is trying to leave terminal state " + current; - LOG.error(message); - throw new IllegalStateException(message); - } + boolean transitionState(JobStatus current, JobStatus newState); - // now do the actual state transition - if (state == current) { - state = newState; - LOG.info( - "Job {} ({}) switched from state {} to {}.", - getJobName(), - getJobID(), - current, - newState, - error); + void incrementRestarts(); - stateTimestamps[newState.ordinal()] = System.currentTimeMillis(); - notifyJobStatusChange(newState, error); - return true; - } else { - return false; - } - } + void initFailureCause(Throwable t); - public void incrementRestarts() { - numberOfRestartsCounter.inc(); - } + void vertexFinished(); Review comment: Yes, I considered this as well. But the problem is that this introduces a dependency on DefaultExecutionGraph again. For example here: https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java#L672 , we would require a DefaultExecutionGraph in a test mock, voiding the benefits of this effort. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org