zhuzhurk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1186655239
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time timeout) { @Override public CompletableFuture<Acknowledge> updateTaskExecutionState( final TaskExecutionState taskExecutionState) { - FlinkException taskExecutionException; + checkNotNull(taskExecutionState, "taskExecutionState"); + // Use the main/caller thread for all updates to make sure they are processed in order. + // (MainThreadExecutor i.e., the akka thread pool does not guarantee that) + // Only detach for a FAILED state update that is terminal and may perform io heavy labeling. + if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { + return labelFailure(taskExecutionState) + .thenApplyAsync( + taskStateWithLabels -> { + try { + return doUpdateTaskExecutionState(taskStateWithLabels); + } catch (FlinkException e) { + throw new CompletionException(e); + } + }, + getMainThreadExecutor()); + } try { - checkNotNull(taskExecutionState, "taskExecutionState"); + return CompletableFuture.completedFuture( + doUpdateTaskExecutionState(taskExecutionState)); + } catch (FlinkException e) { + return FutureUtils.completedExceptionally(e); + } + } + private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) + throws FlinkException { + @Nullable FlinkException taskExecutionException; + try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: If the goal is to label all failures, including those happens within JM, given that the labeling can happen asynchronously, I think it's not avoidable to break current synchronous failure handling process into asynchronous ones. I agree that there is risk to make this kind of change. So maybe we can combine this new asynchronous labeling process with existing asynchronous processes, to avoid introduce unexpected behavior changes. For example, for DefaultScheduler, we can invoke `FailureEnricherUtils#labelFailure(...)` in `ExecutionFailureHandler#handleFailure(...)` and directly record the result `CompletableFuture<Map<String, String>>` in `FailureHandlingResult`. Later in `DefaultScheduler#restartTasksWithDelay(...)`, we can wait until the future to be completed before invoking `archiveFromFailureHandlingResult(...)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org