XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1455102444


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -170,11 +176,15 @@ public RootExceptionHistoryEntry(
             CompletableFuture<Map<String, String>> failureLabels,
             @Nullable String failingTaskName,
             @Nullable TaskManagerLocation taskManagerLocation,
-            Iterable<ExceptionHistoryEntry> concurrentExceptions) {
+            Collection<ExceptionHistoryEntry> concurrentExceptions) {
         super(cause, timestamp, failureLabels, failingTaskName, 
taskManagerLocation);
         this.concurrentExceptions = concurrentExceptions;
     }
 
+    public void addConcurrentExceptions(Iterable<Execution> 
concurrentlyExecutions) {

Review Comment:
   The class was previously meant to be immutable. This method is changing that 
property. I'm wondering whether we should add `@NotThreadSafe` to the class to 
make that specific change more explicit. 
   
   The change itself should be still ok because failure handling happens in the 
`JobMaster`'s main thread which is how we avoid concurrency issues.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##########
@@ -61,8 +61,9 @@ public long getBackoffTime() {
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         currentRestartAttempt++;
+        return true;

Review Comment:
   Aren't we also collecting exceptions here? Because the restart might happen 
with some delay. :thinking: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##########
@@ -150,7 +154,17 @@ public long getTimestamp() {
      *
      * @return The concurrently failed {@code Executions}.
      */
-    public Iterable<Execution> getConcurrentlyFailedExecution() {
+    public Set<Execution> getConcurrentlyFailedExecution() {
         return Collections.unmodifiableSet(concurrentlyFailedExecutions);
     }
+
+    /**
+     * @return Whether the current failure is a new attempt. True means that 
the current failure is

Review Comment:
   ```suggestion
        * @return True means that the current failure is
   ```
   nit: The first sentence does not add any more information. You could also go 
with the proposal I shared in `FailureHandlingResult` if you think it's a 
reasonable change.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -71,7 +74,10 @@ void setUp() {
 
         failoverStrategy = new TestFailoverStrategy();
         testingFailureEnricher = new TestingFailureEnricher();
-        backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 
RESTART_DELAY_MS);
+        isNewAttempt = new AtomicBoolean(true);
+        backoffTimeStrategy =
+                new TestRestartBackoffTimeStrategy(
+                        true, RESTART_DELAY_MS, () -> isNewAttempt.get());

Review Comment:
   ```suggestion
                           true, RESTART_DELAY_MS, isNewAttempt::get);
   ```
   nit



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java:
##########
@@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy {
      * Notify the strategy about the task failure cause.
      *
      * @param cause of the task failure
+     * @return Whether the current failure is a new attempt. True means that 
the current failure is
+     *     a new attempt, false means that there has been a failure before and 
has not been tried
+     *     yet, and the current failure will be merged into the previous 
attempt.

Review Comment:
   ```suggestion
        * @return True means that the current failure is the first one after 
the most-recent failure
        *     handling happened, false means that there has been a failure 
before that was not handled,
        *     yet, and the current failure will be considered in a combined 
failure handling effort.
   ```
   nit: just another proposal to remove the "attempt" from the description.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java:
##########
@@ -79,11 +79,12 @@ public long getBackoffTime() {
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         if (isFailureTimestampsQueueFull()) {
             failureTimestamps.remove();
         }
         failureTimestamps.add(clock.absoluteTimeMillis());
+        return true;

Review Comment:
   Same here, isn't that also collecting failures before doing the restart? So 
that we would have not a "first attempt" every single time?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -171,6 +179,9 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
         assertThat(result.getFailureLabels().get())
                 .isEqualTo(testingFailureEnricher.getFailureLabels());
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        // NonRecoverableFailure is new attempt even if 
RestartBackoffTimeStrategy consider it's not
+        // new attempt.
+        assertThat(result.isNewAttempt()).isTrue();

Review Comment:
   ```suggestion
           assertThat(result.isNewAttempt())
                   .as(
                           "A NonRecoverableFailure should be new attempt even 
if RestartBackoffTimeStrategy consider it's not new attempt.")
                   .isTrue();
   ```
   just another nitty hint: For tests, we don't need to add comments but could 
rather have a description assertion message. That serves the same purpose as 
the command but also makes the test execution more descriptive.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -708,27 +712,43 @@ private void archiveGlobalFailure(
             long timestamp,
             CompletableFuture<Map<String, String>> failureLabels,
             Iterable<Execution> executions) {
-        exceptionHistory.add(
+        latestRootExceptionEntry =
                 RootExceptionHistoryEntry.fromGlobalFailure(
-                        failure, timestamp, failureLabels, executions));
+                        failure, timestamp, failureLabels, executions);
+        exceptionHistory.add(latestRootExceptionEntry);
         log.debug("Archive global failure.", failure);
     }
 
     protected final void archiveFromFailureHandlingResult(
             FailureHandlingResultSnapshot failureHandlingResult) {
+        // Handle all subsequent exceptions as the concurrent exceptions when 
it's not a new
+        // attempt.
+        if (!failureHandlingResult.isNewAttempt()) {
+            checkState(
+                    latestRootExceptionEntry != null,
+                    "A root exception entry should exist if 
failureHandlingResult wasn't "
+                            + "generated as part of a new error handling 
cycle.");
+            List<Execution> concurrentlyExecutions = new ArrayList<>();
+            
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+            
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());
+
+            
latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions);
+            return;
+        }
+
         if (failureHandlingResult.getRootCauseExecution().isPresent()) {

Review Comment:
   ```suggestion
           } else if 
(failureHandlingResult.getRootCauseExecution().isPresent()) {
   ```
   nit: Just as a proposal to to make it more obvious that we're handling three 
exclusive cases in this method. If you go with that proposal, you might want to 
move the comment into the if block.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
         assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
     }
 
+    /** Test isNewAttempt of {@link FailureHandlingResult} is expected. */
+    @Test
+    void testNewAttemptAndNumberOfRestarts() throws Exception {

Review Comment:
   The comment is obsolete because it could be expressed through the test 
method name(s) (to reduce redundant information in the code). 
   
   Additionally, what's your opinion on splitting this big test up into smaller 
test methods. I see the benefit of having one big one. Because you can check 
the number of restarts at the same time. But having more specific test methods 
usually helps with improving readability of the test code.. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
         return globalFailure;
     }
 
+    /** @return Whether this failure is a new attempt. */
+    public boolean isNewAttempt() {

Review Comment:
   What about calling it `isRootCause` here analogously to the exception 
history? ...or isFirstErrorInNewErrorHandlingCycle 🤔 Just as other more 
descriptive proposals. Up to you, I'm not fighting for any version. :innocent: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws 
ExecutionException, Interrup
         final CompletableFuture<Map<String, String>> rootFailureLabels =
                 
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
-        final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
-        final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-        final long concurrentExceptionTimestamp =
-                triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
+        final Throwable concurrentException1 = new 
IllegalStateException("Expected other failure1");
+        final ExecutionVertex concurrentlyFailedExecutionVertex1 = 
extractExecutionVertex(1);
+        Predicate<ExceptionHistoryEntry> exception1Predicate =
+                getExceptionHistoryEntryPredicate(
+                        concurrentException1, 
concurrentlyFailedExecutionVertex1);
+
+        final Throwable concurrentException2 = new 
IllegalStateException("Expected other failure2");
+        final ExecutionVertex concurrentlyFailedExecutionVertex2 = 
extractExecutionVertex(2);

Review Comment:
   Can you move these two lines further down where they are actually used 
(alternatively, you could move the Predicate initialization up). The goal would 
be to have the entire initialization in one place. Me personally, I would 
prefer moving the code down. That gives less distraction to the reader so that 
he/she can focus on the first concurrent exception handling at the beginning. 
But up to you...



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws 
ExecutionException, Interrup
         final CompletableFuture<Map<String, String>> rootFailureLabels =
                 
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
-        final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
-        final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-        final long concurrentExceptionTimestamp =
-                triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
+        final Throwable concurrentException1 = new 
IllegalStateException("Expected other failure1");
+        final ExecutionVertex concurrentlyFailedExecutionVertex1 = 
extractExecutionVertex(1);
+        Predicate<ExceptionHistoryEntry> exception1Predicate =
+                getExceptionHistoryEntryPredicate(
+                        concurrentException1, 
concurrentlyFailedExecutionVertex1);
+
+        final Throwable concurrentException2 = new 
IllegalStateException("Expected other failure2");
+        final ExecutionVertex concurrentlyFailedExecutionVertex2 = 
extractExecutionVertex(2);

Review Comment:
   Thinking about it: Another solution would be to create a dedicated test 
method `testAddConecurrentExceptions`. WDYT? :thinking: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -150,25 +164,24 @@ void testFromGlobalFailure() throws ExecutionException, 
InterruptedException {
                         ExceptionHistoryEntryMatcher.matchesGlobalFailure(
                                 rootCause, rootTimestamp, 
rootFailureLabels.get()));
 
-        final Predicate<ExceptionHistoryEntry> exception0Predicate =
-                ExceptionHistoryEntryMatcher.matchesFailure(
-                        concurrentException0,
-                        concurrentExceptionTimestamp0,
-                        
concurrentlyFailedExecutionVertex0.getTaskNameWithSubtaskIndex(),
-                        
concurrentlyFailedExecutionVertex0.getCurrentAssignedResourceLocation());
-        final Predicate<ExceptionHistoryEntry> exception1Predicate =
-                ExceptionHistoryEntryMatcher.matchesFailure(
-                        concurrentException1,
-                        concurrentExceptionTimestamp1,
-                        
concurrentlyFailedExecutionVertex1.getTaskNameWithSubtaskIndex(),
-                        
concurrentlyFailedExecutionVertex1.getCurrentAssignedResourceLocation());
         assertThat(actualEntry.getConcurrentExceptions())
                 .allMatch(
                         exceptionHistoryEntry ->
                                 exception0Predicate.test(exceptionHistoryEntry)
                                         || 
exception1Predicate.test(exceptionHistoryEntry));
     }
 
+    private Predicate<ExceptionHistoryEntry> getExceptionHistoryEntryPredicate(

Review Comment:
   ```suggestion
       private Predicate<ExceptionHistoryEntry> 
triggerFailureAndCreateEntryMatcher(
   ```
   Based on the old name, I didn't expect the triggering of the failure. We 
might want to mention that in the method name because it's an essential part of 
each test scenario that the failure is actually registered through a state 
update in the `ExecutionGraph`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to