XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1455102444
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ########## @@ -170,11 +176,15 @@ public RootExceptionHistoryEntry( CompletableFuture<Map<String, String>> failureLabels, @Nullable String failingTaskName, @Nullable TaskManagerLocation taskManagerLocation, - Iterable<ExceptionHistoryEntry> concurrentExceptions) { + Collection<ExceptionHistoryEntry> concurrentExceptions) { super(cause, timestamp, failureLabels, failingTaskName, taskManagerLocation); this.concurrentExceptions = concurrentExceptions; } + public void addConcurrentExceptions(Iterable<Execution> concurrentlyExecutions) { Review Comment: The class was previously meant to be immutable. This method is changing that property. I'm wondering whether we should add `@NotThreadSafe` to the class to make that specific change more explicit. The change itself should be still ok because failure handling happens in the `JobMaster`'s main thread which is how we avoid concurrency issues. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ########## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override - public void notifyFailure(Throwable cause) { + public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; + return true; Review Comment: Aren't we also collecting exceptions here? Because the restart might happen with some delay. :thinking: ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java: ########## @@ -150,7 +154,17 @@ public long getTimestamp() { * * @return The concurrently failed {@code Executions}. */ - public Iterable<Execution> getConcurrentlyFailedExecution() { + public Set<Execution> getConcurrentlyFailedExecution() { return Collections.unmodifiableSet(concurrentlyFailedExecutions); } + + /** + * @return Whether the current failure is a new attempt. True means that the current failure is Review Comment: ```suggestion * @return True means that the current failure is ``` nit: The first sentence does not add any more information. You could also go with the proposal I shared in `FailureHandlingResult` if you think it's a reasonable change. ########## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ########## @@ -71,7 +74,10 @@ void setUp() { failoverStrategy = new TestFailoverStrategy(); testingFailureEnricher = new TestingFailureEnricher(); - backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS); + isNewAttempt = new AtomicBoolean(true); + backoffTimeStrategy = + new TestRestartBackoffTimeStrategy( + true, RESTART_DELAY_MS, () -> isNewAttempt.get()); Review Comment: ```suggestion true, RESTART_DELAY_MS, isNewAttempt::get); ``` nit ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java: ########## @@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy { * Notify the strategy about the task failure cause. * * @param cause of the task failure + * @return Whether the current failure is a new attempt. True means that the current failure is + * a new attempt, false means that there has been a failure before and has not been tried + * yet, and the current failure will be merged into the previous attempt. Review Comment: ```suggestion * @return True means that the current failure is the first one after the most-recent failure * handling happened, false means that there has been a failure before that was not handled, * yet, and the current failure will be considered in a combined failure handling effort. ``` nit: just another proposal to remove the "attempt" from the description. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java: ########## @@ -79,11 +79,12 @@ public long getBackoffTime() { } @Override - public void notifyFailure(Throwable cause) { + public boolean notifyFailure(Throwable cause) { if (isFailureTimestampsQueueFull()) { failureTimestamps.remove(); } failureTimestamps.add(clock.absoluteTimeMillis()); + return true; Review Comment: Same here, isn't that also collecting failures before doing the restart? So that we would have not a "first attempt" every single time? ########## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ########## @@ -171,6 +179,9 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(result.getFailureLabels().get()) .isEqualTo(testingFailureEnricher.getFailureLabels()); assertThat(result.getTimestamp()).isEqualTo(timestamp); + // NonRecoverableFailure is new attempt even if RestartBackoffTimeStrategy consider it's not + // new attempt. + assertThat(result.isNewAttempt()).isTrue(); Review Comment: ```suggestion assertThat(result.isNewAttempt()) .as( "A NonRecoverableFailure should be new attempt even if RestartBackoffTimeStrategy consider it's not new attempt.") .isTrue(); ``` just another nitty hint: For tests, we don't need to add comments but could rather have a description assertion message. That serves the same purpose as the command but also makes the test execution more descriptive. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -708,27 +712,43 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture<Map<String, String>> failureLabels, Iterable<Execution> executions) { - exceptionHistory.add( + latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( - failure, timestamp, failureLabels, executions)); + failure, timestamp, failureLabels, executions); + exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { + // Handle all subsequent exceptions as the concurrent exceptions when it's not a new + // attempt. + if (!failureHandlingResult.isNewAttempt()) { + checkState( + latestRootExceptionEntry != null, + "A root exception entry should exist if failureHandlingResult wasn't " + + "generated as part of a new error handling cycle."); + List<Execution> concurrentlyExecutions = new ArrayList<>(); + failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add); + concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution()); + + latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions); + return; + } + if (failureHandlingResult.getRootCauseExecution().isPresent()) { Review Comment: ```suggestion } else if (failureHandlingResult.getRootCauseExecution().isPresent()) { ``` nit: Just as a proposal to to make it more obvious that we're handling three exclusive cases in this method. If you go with that proposal, you might want to move the comment into the if block. ########## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ########## @@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } + /** Test isNewAttempt of {@link FailureHandlingResult} is expected. */ + @Test + void testNewAttemptAndNumberOfRestarts() throws Exception { Review Comment: The comment is obsolete because it could be expressed through the test method name(s) (to reduce redundant information in the code). Additionally, what's your opinion on splitting this big test up into smaller test methods. I see the benefit of having one big one. Because you can check the number of restarts at the same time. But having more specific test methods usually helps with improving readability of the test code.. WDYT? ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ########## @@ -206,6 +214,11 @@ public boolean isGlobalFailure() { return globalFailure; } + /** @return Whether this failure is a new attempt. */ + public boolean isNewAttempt() { Review Comment: What about calling it `isRootCause` here analogously to the exception history? ...or isFirstErrorInNewErrorHandlingCycle 🤔 Just as other more descriptive proposals. Up to you, I'm not fighting for any version. :innocent: ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java: ########## @@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws ExecutionException, Interrup final CompletableFuture<Map<String, String>> rootFailureLabels = CompletableFuture.completedFuture(Collections.singletonMap("key", "value")); - final Throwable concurrentException = new IllegalStateException("Expected other failure"); - final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); - final long concurrentExceptionTimestamp = - triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); + final Throwable concurrentException1 = new IllegalStateException("Expected other failure1"); + final ExecutionVertex concurrentlyFailedExecutionVertex1 = extractExecutionVertex(1); + Predicate<ExceptionHistoryEntry> exception1Predicate = + getExceptionHistoryEntryPredicate( + concurrentException1, concurrentlyFailedExecutionVertex1); + + final Throwable concurrentException2 = new IllegalStateException("Expected other failure2"); + final ExecutionVertex concurrentlyFailedExecutionVertex2 = extractExecutionVertex(2); Review Comment: Can you move these two lines further down where they are actually used (alternatively, you could move the Predicate initialization up). The goal would be to have the entire initialization in one place. Me personally, I would prefer moving the code down. That gives less distraction to the reader so that he/she can focus on the first concurrent exception handling at the beginning. But up to you... ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java: ########## @@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws ExecutionException, Interrup final CompletableFuture<Map<String, String>> rootFailureLabels = CompletableFuture.completedFuture(Collections.singletonMap("key", "value")); - final Throwable concurrentException = new IllegalStateException("Expected other failure"); - final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); - final long concurrentExceptionTimestamp = - triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); + final Throwable concurrentException1 = new IllegalStateException("Expected other failure1"); + final ExecutionVertex concurrentlyFailedExecutionVertex1 = extractExecutionVertex(1); + Predicate<ExceptionHistoryEntry> exception1Predicate = + getExceptionHistoryEntryPredicate( + concurrentException1, concurrentlyFailedExecutionVertex1); + + final Throwable concurrentException2 = new IllegalStateException("Expected other failure2"); + final ExecutionVertex concurrentlyFailedExecutionVertex2 = extractExecutionVertex(2); Review Comment: Thinking about it: Another solution would be to create a dedicated test method `testAddConecurrentExceptions`. WDYT? :thinking: ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java: ########## @@ -150,25 +164,24 @@ void testFromGlobalFailure() throws ExecutionException, InterruptedException { ExceptionHistoryEntryMatcher.matchesGlobalFailure( rootCause, rootTimestamp, rootFailureLabels.get())); - final Predicate<ExceptionHistoryEntry> exception0Predicate = - ExceptionHistoryEntryMatcher.matchesFailure( - concurrentException0, - concurrentExceptionTimestamp0, - concurrentlyFailedExecutionVertex0.getTaskNameWithSubtaskIndex(), - concurrentlyFailedExecutionVertex0.getCurrentAssignedResourceLocation()); - final Predicate<ExceptionHistoryEntry> exception1Predicate = - ExceptionHistoryEntryMatcher.matchesFailure( - concurrentException1, - concurrentExceptionTimestamp1, - concurrentlyFailedExecutionVertex1.getTaskNameWithSubtaskIndex(), - concurrentlyFailedExecutionVertex1.getCurrentAssignedResourceLocation()); assertThat(actualEntry.getConcurrentExceptions()) .allMatch( exceptionHistoryEntry -> exception0Predicate.test(exceptionHistoryEntry) || exception1Predicate.test(exceptionHistoryEntry)); } + private Predicate<ExceptionHistoryEntry> getExceptionHistoryEntryPredicate( Review Comment: ```suggestion private Predicate<ExceptionHistoryEntry> triggerFailureAndCreateEntryMatcher( ``` Based on the old name, I didn't expect the triggering of the failure. We might want to mention that in the method name because it's an essential part of each test scenario that the failure is actually registered through a state update in the `ExecutionGraph` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org