zhuzhurk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1195539179
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java: ########## @@ -51,22 +57,35 @@ public class ExecutionFailureHandler { /** Number of all restarts happened since this job is submitted. */ private long numberOfRestarts; + private final Context taskFailureCtx; + private final Context globalFailureCtx; Review Comment: IIUC, we we creating two contexts to host different `FailureType`? This does not seems to be very flexible. Later we need to add one more context for `TASK_MANAGER` type, and maybe even more if more types are introduced. Also, like @dmvk mentioned in https://github.com/apache/flink/pull/22506#discussion_r1195408929. We may need to provide more information about the error to the enrichers, e.g. related task. Such information can be decided only after an error occurs. So maybe a more flexible way is to create a new context for each failure, i.e. create from `failedExecution ` in `handleFailure ()`. It's also fine for me to do it in a follow-up task. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java: ########## @@ -66,6 +76,13 @@ public static Throwable handleMissingThrowable(@Nullable Throwable throwable) { } public ErrorInfo(@Nonnull Throwable exception, long timestamp) { + this(exception, timestamp, FailureEnricherUtils.EMPTY_LABELS); Review Comment: Given that not all `ErrorInfo` will be labeled, maybe we can directly introduce the label related fields to the subclass `ExceptionHistoryEntry`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -295,7 +300,7 @@ void initializeVerticesIfPossible() { } } catch (JobException ex) { log.error("Unexpected error occurred when initializing ExecutionJobVertex", ex); - failJob(ex, System.currentTimeMillis()); + failJob(ex, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_LABELS); Review Comment: +1 to label this error, just like all other errors. One simple way can be replacing the `failJob()` with `handleGlobalFailure()` and notifying an unrecoverable error, e.g. `SuppressRestartsException`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java: ########## @@ -39,6 +44,10 @@ public class ErrorInfo implements Serializable { private final long timestamp; + private final transient CompletableFuture<Map<String, String>> labelsFuture; + /** Labels associated with the exception, set as soon as labelsFuture is completed. */ + private Map<String, String> labels; Review Comment: > we should just make sure labels field is marked as volatile, because it could be set from a different thread Maybe we can change `FailureEnricherUtils#labelFailure()` to return a future that will be completed in the main thread, to avoid unexpected bugs if later someone who does not have this knowledge develops callbacks upon this future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org