1996fanrui commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1448255898
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ########## @@ -367,17 +367,13 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart); - final FailureHandlingResultSnapshot failureHandlingResultSnapshot = - createFailureHandlingResultSnapshot(failureHandlingResult); + archiveFromFailureHandlingResult( + createFailureHandlingResultSnapshot(failureHandlingResult)); delayExecutor.schedule( () -> FutureUtils.assertNoException( cancelFuture.thenRunAsync( - () -> { - archiveFromFailureHandlingResult( Review Comment: Thanks for the clarification! The initial motivation might for collecting all concurrent exceptions. In this PR, the solution is save the `latestRootExceptionEntry` as a filed in `SchedulerBase`, when all subsequent non-root exceptions will be added to the `latestRootExceptionEntry`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ########## @@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry createRootExceptionHistoryEntry( failureLabels, failingTaskName, taskManagerLocation, - StreamSupport.stream(executions.spliterator(), false) - .filter(execution -> execution.getFailureInfo().isPresent()) - .map( - execution -> - ExceptionHistoryEntry.create( - execution, - execution.getVertexWithAttempt(), - FailureEnricherUtils.EMPTY_FAILURE_LABELS)) - .collect(Collectors.toList())); + createExceptionHistoryEntries(executions)); + } + + public static List<ExceptionHistoryEntry> createExceptionHistoryEntries( Review Comment: > we could move the logic into addConcurrentExceptions and call addConcurrentExceptions within createRootExceptionHistoryEntry on the newly created instance. It makes sense to me, thanks~ ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ########## @@ -206,6 +214,11 @@ public boolean isGlobalFailure() { return globalFailure; } + /** @return Whether this failure is a new attempt. */ + public boolean isNewAttempt() { Review Comment: I added some comments to explain it. What do you think? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ########## @@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure( } public static RootExceptionHistoryEntry fromExceptionHistoryEntry( - ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry> entries) { + ExceptionHistoryEntry entry, List<ExceptionHistoryEntry> entries) { Review Comment: Change it to `Collection`. We need to merge more exeptions into the `concurrentExceptions`, and `Iterable` doesn't support change. So changing it to `Collection`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture<Map<String, String>> failureLabels, Iterable<Execution> executions) { - exceptionHistory.add( + latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( - failure, timestamp, failureLabels, executions)); + failure, timestamp, failureLabels, executions); + exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { + // ALl exceptions as the ConcurrentExceptions when it's not a new attempt. + if (!failureHandlingResult.isNewAttempt()) { + checkState(latestRootExceptionEntry != null, "It should have old failure."); + List<Execution> concurrentlyExecutions = new LinkedList<>(); + failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add); + concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution()); Review Comment: Thanks for the detailed reveiw! Good catch, I changed it to the `ArrayList`, the code will be simple when we use `addAll`. If we add them based on single element, code will get complex. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org