pgaref commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1188101108
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time timeout) { @Override public CompletableFuture<Acknowledge> updateTaskExecutionState( final TaskExecutionState taskExecutionState) { - FlinkException taskExecutionException; + checkNotNull(taskExecutionState, "taskExecutionState"); + // Use the main/caller thread for all updates to make sure they are processed in order. + // (MainThreadExecutor i.e., the akka thread pool does not guarantee that) + // Only detach for a FAILED state update that is terminal and may perform io heavy labeling. + if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { + return labelFailure(taskExecutionState) + .thenApplyAsync( + taskStateWithLabels -> { + try { + return doUpdateTaskExecutionState(taskStateWithLabels); + } catch (FlinkException e) { + throw new CompletionException(e); + } + }, + getMainThreadExecutor()); + } try { - checkNotNull(taskExecutionState, "taskExecutionState"); + return CompletableFuture.completedFuture( + doUpdateTaskExecutionState(taskExecutionState)); + } catch (FlinkException e) { + return FutureUtils.completedExceptionally(e); + } + } + private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) + throws FlinkException { + @Nullable FlinkException taskExecutionException; + try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: Looks like the main decision we have to take here is if the failure-labels going to be used by restart strategies or not -- relying on enrichment for restarts is what makes it crucial. The existing implementation was [based on the assumption](https://lists.apache.org/thread/tq8yrncg7zqtpc8ddpxrkxfpovs1wkkw) that labels are going to be used by the custom restart strategies in the future. Since we wanted them asynchronous, the less risky way was through existing async calls e.g., `JobMaster#updateTaskExecutionState`, and probably modifying the InternalFailuresListener (rather than changing SchedulerNG update state to async). Deciding the failure enrichment is crucial enough to be synchronous -- maybe part of `DefaultScheduler#restartTasksWithDelay`-- is also an option. However, decoupling failure labels completely from restart strategies sounds like a step back here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org