1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1448255898


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -367,17 +367,13 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
 
         final CompletableFuture<?> cancelFuture = 
cancelTasksAsync(verticesToRestart);
 
-        final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
-                createFailureHandlingResultSnapshot(failureHandlingResult);
+        archiveFromFailureHandlingResult(
+                createFailureHandlingResultSnapshot(failureHandlingResult));
         delayExecutor.schedule(
                 () ->
                         FutureUtils.assertNoException(
                                 cancelFuture.thenRunAsync(
-                                        () -> {
-                                            archiveFromFailureHandlingResult(

Review Comment:
   Thanks for the clarification!
   
   The initial motivation might for collecting all concurrent exceptions. In 
this PR, the solution is save the `latestRootExceptionEntry` as a filed in 
`SchedulerBase`, when all subsequent non-root exceptions will be added to the 
`latestRootExceptionEntry`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry 
createRootExceptionHistoryEntry(
                 failureLabels,
                 failingTaskName,
                 taskManagerLocation,
-                StreamSupport.stream(executions.spliterator(), false)
-                        .filter(execution -> 
execution.getFailureInfo().isPresent())
-                        .map(
-                                execution ->
-                                        ExceptionHistoryEntry.create(
-                                                execution,
-                                                
execution.getVertexWithAttempt(),
-                                                
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
-                        .collect(Collectors.toList()));
+                createExceptionHistoryEntries(executions));
+    }
+
+    public static List<ExceptionHistoryEntry> createExceptionHistoryEntries(

Review Comment:
   > we could move the logic into addConcurrentExceptions and call 
addConcurrentExceptions within createRootExceptionHistoryEntry on the newly 
created instance. 
   
   It makes sense to me, thanks~



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
         return globalFailure;
     }
 
+    /** @return Whether this failure is a new attempt. */
+    public boolean isNewAttempt() {

Review Comment:
   I added some comments to explain it. What do you think?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure(
     }
 
     public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
-            ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry> 
entries) {
+            ExceptionHistoryEntry entry, List<ExceptionHistoryEntry> entries) {

Review Comment:
   Change it to `Collection`.
   
   We need to merge more exeptions into the `concurrentExceptions`, and 
`Iterable` doesn't support change. So changing it to `Collection`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
             long timestamp,
             CompletableFuture<Map<String, String>> failureLabels,
             Iterable<Execution> executions) {
-        exceptionHistory.add(
+        latestRootExceptionEntry =
                 RootExceptionHistoryEntry.fromGlobalFailure(
-                        failure, timestamp, failureLabels, executions));
+                        failure, timestamp, failureLabels, executions);
+        exceptionHistory.add(latestRootExceptionEntry);
         log.debug("Archive global failure.", failure);
     }
 
     protected final void archiveFromFailureHandlingResult(
             FailureHandlingResultSnapshot failureHandlingResult) {
+        // ALl exceptions as the ConcurrentExceptions when it's not a new 
attempt.
+        if (!failureHandlingResult.isNewAttempt()) {
+            checkState(latestRootExceptionEntry != null, "It should have old 
failure.");
+            List<Execution> concurrentlyExecutions = new LinkedList<>();
+            
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+            
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());

Review Comment:
   Thanks for the detailed reveiw! 
   
   Good catch, I changed it to the `ArrayList`, the code will be simple when we 
use `addAll`. If we add them based on  single element, code will get complex.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to