XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1444596677
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ########## @@ -66,6 +66,9 @@ public class FailureHandlingResult { /** True if the original failure was a global failure. */ private final boolean globalFailure; + /** Tue if current failure is a new attempt. */ Review Comment: ```suggestion /** True if current failure is a new attempt. */ ``` nit ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ########## @@ -85,7 +89,9 @@ private FailureHandlingResult( CompletableFuture<Map<String, String>> failureLabels, @Nullable Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, - boolean globalFailure) { + boolean globalFailure, + boolean isNewAttempt) { + this.isNewAttempt = isNewAttempt; Review Comment: nit: usually, contributors kind of stick to the order of the parameter list when setting the fields within the constructor. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ########## @@ -367,17 +367,13 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart); - final FailureHandlingResultSnapshot failureHandlingResultSnapshot = - createFailureHandlingResultSnapshot(failureHandlingResult); + archiveFromFailureHandlingResult( + createFailureHandlingResultSnapshot(failureHandlingResult)); delayExecutor.schedule( () -> FutureUtils.assertNoException( cancelFuture.thenRunAsync( - () -> { - archiveFromFailureHandlingResult( Review Comment: > The first commit is refactoring, actually, I don't know why archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task. So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. I guess the motivation was to be able to collect all concurrent exceptions that happen before triggering the restart. But you're right - it doesn't make a difference because we're creating the `FailureHandlingResultSnapshot` already earlier (before scheduling the restart). I'm just wondering whether we should have created the snapshot in the scheduled task rather than before scheduling it to capture any concurrent exceptions that happened before the restart is triggered. :thinking: ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture<Map<String, String>> failureLabels, Iterable<Execution> executions) { - exceptionHistory.add( + latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( - failure, timestamp, failureLabels, executions)); + failure, timestamp, failureLabels, executions); + exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { + // ALl exceptions as the ConcurrentExceptions when it's not a new attempt. Review Comment: ```suggestion // handle all subsequent exceptions as the concurrent exceptions when it's not a new attempt. ``` nit: the verb was missing. Additionally, `ConcurrentExceptions` indicates that it's some kind of class. We could use the language casing here in my opinion. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture<Map<String, String>> failureLabels, Iterable<Execution> executions) { - exceptionHistory.add( + latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( - failure, timestamp, failureLabels, executions)); + failure, timestamp, failureLabels, executions); + exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { + // ALl exceptions as the ConcurrentExceptions when it's not a new attempt. + if (!failureHandlingResult.isNewAttempt()) { + checkState(latestRootExceptionEntry != null, "It should have old failure."); + List<Execution> concurrentlyExecutions = new LinkedList<>(); + failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add); + concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution()); Review Comment: nit: Here we use a LinkedList on a Set (which is returned by `getConcurrentlyFailedExecution`). Either we switch to `ArrayList` to benefit from the `addAll` call or we revert the return type of `getConcurrentlyFailedExecution` back to `Iterable` and benefit from the single-element add performance of `LinkedList`. WDYT? :thinking: ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java: ########## @@ -150,7 +154,11 @@ public long getTimestamp() { * * @return The concurrently failed {@code Executions}. */ - public Iterable<Execution> getConcurrentlyFailedExecution() { + public Set<Execution> getConcurrentlyFailedExecution() { Review Comment: I'm not sure whether this change is necessary. See more other related comment. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ########## @@ -206,6 +214,11 @@ public boolean isGlobalFailure() { return globalFailure; } + /** @return Whether this failure is a new attempt. */ + public boolean isNewAttempt() { Review Comment: The term "attempt" seems to be a bit ambigious in the context of the `FailreHandlingResult`. WDYT? :thinking: But I cannot come up with a better proposal, either. :shrug: ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ########## @@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure( } public static RootExceptionHistoryEntry fromExceptionHistoryEntry( - ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry> entries) { + ExceptionHistoryEntry entry, List<ExceptionHistoryEntry> entries) { Review Comment: `List` is too restrictive here, AFAIS ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture<Map<String, String>> failureLabels, Iterable<Execution> executions) { - exceptionHistory.add( + latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( - failure, timestamp, failureLabels, executions)); + failure, timestamp, failureLabels, executions); + exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { + // ALl exceptions as the ConcurrentExceptions when it's not a new attempt. + if (!failureHandlingResult.isNewAttempt()) { + checkState(latestRootExceptionEntry != null, "It should have old failure."); Review Comment: ```suggestion checkState(latestRootExceptionEntry != null, "A root exception entry should exist if failureHandlingResult wasn't generated as part of a new error handling cycle."); ``` Another nitty thing: Using "it" (or any other pronouns) in code causes ambiguity in a lot of cases. We might want to be more explicit when documenting code. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ########## @@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry createRootExceptionHistoryEntry( failureLabels, failingTaskName, taskManagerLocation, - StreamSupport.stream(executions.spliterator(), false) - .filter(execution -> execution.getFailureInfo().isPresent()) - .map( - execution -> - ExceptionHistoryEntry.create( - execution, - execution.getVertexWithAttempt(), - FailureEnricherUtils.EMPTY_FAILURE_LABELS)) - .collect(Collectors.toList())); + createExceptionHistoryEntries(executions)); + } + + public static List<ExceptionHistoryEntry> createExceptionHistoryEntries( Review Comment: The intial idea was to keep the exception history entries immutable. This change adds the `addConcurrentExceptions` method (which is ok, I guess, because the scheduler runs in the main thread and we don't have to be that strict on immutable objects in this case). But we don't need to expose `createExceptionHistoryEntries` here. Instead, we could move the logic into `addConcurrentExceptions` and call `addConcurrentExceptions` within `createRootExceptionHistoryEntry` on the newly created instance. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org