XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r569986337
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor getMainThreadExecutor() { protected void failJob(Throwable cause) { incrementVersionsOfAllVertices(); executionGraph.failJob(cause); + getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause)); Review comment: Here, I don't understand fully: You're referring to the case where a failure happens, the user cancels the job while the failure handling is done and the `failJob` method might be called while being in a `CANCELED`/`CANCELLING` state?For this case, I would still think that we should archive the exception because the users intervention happened after the exception happened. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } + @Test + public void testExceptionHistoryWithRestartableFailure() { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + // initiate restartable failure + final ExecutionAttemptID restartableAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + final RuntimeException restartableException = new RuntimeException("restartable exception"); + Range<Long> updateStateTriggeringRestartTimeframe = + initiateFailure(scheduler, jobId, restartableAttemptId, restartableException); + + taskRestartExecutor.triggerNonPeriodicScheduledTask(); + + // initiate job failure + testRestartBackoffTimeStrategy.setCanRestart(false); + + final ExecutionAttemptID failingAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + final RuntimeException failingException = new RuntimeException("failing exception"); + Range<Long> updateStateTriggeringJobFailureTimeframe = + initiateFailure(scheduler, jobId, failingAttemptId, failingException); + + List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory(); + assertThat(actualExceptionHistory.size(), is(2)); + + // assert restarted attempt + ErrorInfo restartableFailure = actualExceptionHistory.get(0); + assertThat( + restartableFailure + .getException() + .deserializeError(ClassLoader.getSystemClassLoader()), + is(restartableException)); + assertThat( + restartableFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); + assertThat( + restartableFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + + // assert job failure attempt + ErrorInfo globalFailure = actualExceptionHistory.get(1); + Throwable actualException = + globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); + assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class)); + assertThat(actualException.getCause(), is(failingException)); + assertThat( + globalFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint())); + assertThat( + globalFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); + } + + private static Range<Long> initiateFailure( + DefaultScheduler scheduler, + JobID jobId, + ExecutionAttemptID executionAttemptID, + Throwable exception) { + long start = System.currentTimeMillis(); Review comment: Thanks for clarification. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ########## @@ -1022,18 +1021,38 @@ private boolean transitionState(ExecutionState currentState, ExecutionState newS return transitionState(currentState, newState, null); } + /** + * Try to transition to FAILED state from a given state and sets the {@code failureCause}. + * + * @param currentState of the execution + * @param cause the {@link Throwable} causing the failure + * @return true if the transition was successful, otherwise false + * @throws NullPointerException if no {@code cause} is provided + */ + private boolean transitionToFailedStateAndSetFailureCause( + ExecutionState currentState, Throwable cause) { + Preconditions.checkNotNull(cause, "No cause is given when transitioning to FAILED state."); + failureCause = cause; + return transitionState(currentState, ExecutionState.FAILED, cause); Review comment: That's actually a good point. I will correct that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org