XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r569986337



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor 
getMainThreadExecutor() {
     protected void failJob(Throwable cause) {
         incrementVersionsOfAllVertices();
         executionGraph.failJob(cause);
+        getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));

Review comment:
       Here, I don't understand fully: You're referring to the case where a 
failure happens, the user cancels the job while the failure handling is done 
and the `failJob` method might be called while being in a 
`CANCELED`/`CANCELLING` state?For this case, I would still think that we should 
archive the exception because the users intervention happened after the 
exception happened.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new 
RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+        List<ErrorInfo> actualExceptionHistory = 
scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+        assertThat(actualException.getCause(), is(failingException));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+    }
+
+    private static Range<Long> initiateFailure(
+            DefaultScheduler scheduler,
+            JobID jobId,
+            ExecutionAttemptID executionAttemptID,
+            Throwable exception) {
+        long start = System.currentTimeMillis();

Review comment:
       Thanks for clarification.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1022,18 +1021,38 @@ private boolean transitionState(ExecutionState 
currentState, ExecutionState newS
         return transitionState(currentState, newState, null);
     }
 
+    /**
+     * Try to transition to FAILED state from a given state and sets the 
{@code failureCause}.
+     *
+     * @param currentState of the execution
+     * @param cause the {@link Throwable} causing the failure
+     * @return true if the transition was successful, otherwise false
+     * @throws NullPointerException if no {@code cause} is provided
+     */
+    private boolean transitionToFailedStateAndSetFailureCause(
+            ExecutionState currentState, Throwable cause) {
+        Preconditions.checkNotNull(cause, "No cause is given when 
transitioning to FAILED state.");
+        failureCause = cause;
+        return transitionState(currentState, ExecutionState.FAILED, cause);

Review comment:
       That's actually a good point. I will correct that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to