XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1444596677


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -66,6 +66,9 @@ public class FailureHandlingResult {
     /** True if the original failure was a global failure. */
     private final boolean globalFailure;
 
+    /** Tue if current failure is a new attempt. */

Review Comment:
   ```suggestion
       /** True if current failure is a new attempt. */
   ```
   nit



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -85,7 +89,9 @@ private FailureHandlingResult(
             CompletableFuture<Map<String, String>> failureLabels,
             @Nullable Set<ExecutionVertexID> verticesToRestart,
             long restartDelayMS,
-            boolean globalFailure) {
+            boolean globalFailure,
+            boolean isNewAttempt) {
+        this.isNewAttempt = isNewAttempt;

Review Comment:
   nit: usually, contributors kind of stick to the order of the parameter list 
when setting the fields within the constructor.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -367,17 +367,13 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
 
         final CompletableFuture<?> cancelFuture = 
cancelTasksAsync(verticesToRestart);
 
-        final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
-                createFailureHandlingResultSnapshot(failureHandlingResult);
+        archiveFromFailureHandlingResult(
+                createFailureHandlingResultSnapshot(failureHandlingResult));
         delayExecutor.schedule(
                 () ->
                         FutureUtils.assertNoException(
                                 cancelFuture.thenRunAsync(
-                                        () -> {
-                                            archiveFromFailureHandlingResult(

Review Comment:
   > The first commit is refactoring, actually, I don't know why archiving 
exception when restarting task instead of immediately. It means, when one task 
failure, we can see the exception history after flink restart this task. So the 
first commit is only a refactoring. It archives exceptions into the exception 
history immediately when they occur, instead of archiving them when restarting.
   
   I guess the motivation was to be able to collect all concurrent exceptions 
that happen before triggering the restart. But you're right - it doesn't make a 
difference because we're creating the `FailureHandlingResultSnapshot` already 
earlier (before scheduling the restart). I'm just wondering whether we should 
have created the snapshot in the scheduled task rather than before scheduling 
it to capture any concurrent exceptions that happened before the restart is 
triggered. :thinking: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
             long timestamp,
             CompletableFuture<Map<String, String>> failureLabels,
             Iterable<Execution> executions) {
-        exceptionHistory.add(
+        latestRootExceptionEntry =
                 RootExceptionHistoryEntry.fromGlobalFailure(
-                        failure, timestamp, failureLabels, executions));
+                        failure, timestamp, failureLabels, executions);
+        exceptionHistory.add(latestRootExceptionEntry);
         log.debug("Archive global failure.", failure);
     }
 
     protected final void archiveFromFailureHandlingResult(
             FailureHandlingResultSnapshot failureHandlingResult) {
+        // ALl exceptions as the ConcurrentExceptions when it's not a new 
attempt.

Review Comment:
   ```suggestion
           // handle all subsequent exceptions as the concurrent exceptions 
when it's not a new attempt.
   ```
   nit: the verb was missing. Additionally, `ConcurrentExceptions` indicates 
that it's some kind of class. We could use the language casing here in my 
opinion.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
             long timestamp,
             CompletableFuture<Map<String, String>> failureLabels,
             Iterable<Execution> executions) {
-        exceptionHistory.add(
+        latestRootExceptionEntry =
                 RootExceptionHistoryEntry.fromGlobalFailure(
-                        failure, timestamp, failureLabels, executions));
+                        failure, timestamp, failureLabels, executions);
+        exceptionHistory.add(latestRootExceptionEntry);
         log.debug("Archive global failure.", failure);
     }
 
     protected final void archiveFromFailureHandlingResult(
             FailureHandlingResultSnapshot failureHandlingResult) {
+        // ALl exceptions as the ConcurrentExceptions when it's not a new 
attempt.
+        if (!failureHandlingResult.isNewAttempt()) {
+            checkState(latestRootExceptionEntry != null, "It should have old 
failure.");
+            List<Execution> concurrentlyExecutions = new LinkedList<>();
+            
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+            
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());

Review Comment:
   nit: Here we use a LinkedList on a Set (which is returned by 
`getConcurrentlyFailedExecution`). Either we switch to `ArrayList` to benefit 
from the `addAll` call or we revert the return type of 
`getConcurrentlyFailedExecution` back to `Iterable` and benefit from the 
single-element add performance of `LinkedList`. WDYT? :thinking:



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##########
@@ -150,7 +154,11 @@ public long getTimestamp() {
      *
      * @return The concurrently failed {@code Executions}.
      */
-    public Iterable<Execution> getConcurrentlyFailedExecution() {
+    public Set<Execution> getConcurrentlyFailedExecution() {

Review Comment:
   I'm not sure whether this change is necessary. See more other related 
comment.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
         return globalFailure;
     }
 
+    /** @return Whether this failure is a new attempt. */
+    public boolean isNewAttempt() {

Review Comment:
   The term "attempt" seems to be a bit ambigious in the context of the 
`FailreHandlingResult`. WDYT? :thinking: 
    But I cannot come up with a better proposal, either. :shrug: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure(
     }
 
     public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
-            ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry> 
entries) {
+            ExceptionHistoryEntry entry, List<ExceptionHistoryEntry> entries) {

Review Comment:
   `List` is too restrictive here, AFAIS



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
             long timestamp,
             CompletableFuture<Map<String, String>> failureLabels,
             Iterable<Execution> executions) {
-        exceptionHistory.add(
+        latestRootExceptionEntry =
                 RootExceptionHistoryEntry.fromGlobalFailure(
-                        failure, timestamp, failureLabels, executions));
+                        failure, timestamp, failureLabels, executions);
+        exceptionHistory.add(latestRootExceptionEntry);
         log.debug("Archive global failure.", failure);
     }
 
     protected final void archiveFromFailureHandlingResult(
             FailureHandlingResultSnapshot failureHandlingResult) {
+        // ALl exceptions as the ConcurrentExceptions when it's not a new 
attempt.
+        if (!failureHandlingResult.isNewAttempt()) {
+            checkState(latestRootExceptionEntry != null, "It should have old 
failure.");

Review Comment:
   ```suggestion
               checkState(latestRootExceptionEntry != null, "A root exception 
entry should exist if failureHandlingResult wasn't generated as part of a new 
error handling cycle.");
   ```
   Another nitty thing: Using "it" (or any other pronouns) in code causes 
ambiguity in a lot of cases. We might want to be more explicit when documenting 
code.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry 
createRootExceptionHistoryEntry(
                 failureLabels,
                 failingTaskName,
                 taskManagerLocation,
-                StreamSupport.stream(executions.spliterator(), false)
-                        .filter(execution -> 
execution.getFailureInfo().isPresent())
-                        .map(
-                                execution ->
-                                        ExceptionHistoryEntry.create(
-                                                execution,
-                                                
execution.getVertexWithAttempt(),
-                                                
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
-                        .collect(Collectors.toList()));
+                createExceptionHistoryEntries(executions));
+    }
+
+    public static List<ExceptionHistoryEntry> createExceptionHistoryEntries(

Review Comment:
   The intial idea was to keep the exception history entries immutable. This 
change adds the `addConcurrentExceptions` method (which is ok, I guess, because 
the scheduler runs in the main thread and we don't have to be that strict on 
immutable objects in this case). But we don't need to expose 
`createExceptionHistoryEntries` here. Instead, we could move the logic into 
`addConcurrentExceptions` and call `addConcurrentExceptions` within 
`createRootExceptionHistoryEntry` on the newly created instance. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to