zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1195539179


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java:
##########
@@ -51,22 +57,35 @@ public class ExecutionFailureHandler {
     /** Number of all restarts happened since this job is submitted. */
     private long numberOfRestarts;
 
+    private final Context taskFailureCtx;
+    private final Context globalFailureCtx;

Review Comment:
   IIUC, we we creating two contexts to host different `FailureType`?
   This does not seems to be very flexible. Later we need to add one more 
context for `TASK_MANAGER` type, and maybe even more if more types are 
introduced.
   
   Also, like @dmvk mentioned in 
https://github.com/apache/flink/pull/22506#discussion_r1195408929. We may need 
to provide more information about the error to the enrichers, e.g. related 
task. Such information can be decided only after an error occurs. So maybe a 
more flexible way is to create a new context for each failure, i.e. create from 
`failedExecution ` in `handleFailure ()`.
   
   It's also fine for me to do it in a follow-up task.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##########
@@ -66,6 +76,13 @@ public static Throwable handleMissingThrowable(@Nullable 
Throwable throwable) {
     }
 
     public ErrorInfo(@Nonnull Throwable exception, long timestamp) {
+        this(exception, timestamp, FailureEnricherUtils.EMPTY_LABELS);

Review Comment:
   Given that not all `ErrorInfo` will be labeled, maybe we can directly 
introduce the label related fields to the subclass `ExceptionHistoryEntry`?
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -295,7 +300,7 @@ void initializeVerticesIfPossible() {
             }
         } catch (JobException ex) {
             log.error("Unexpected error occurred when initializing 
ExecutionJobVertex", ex);
-            failJob(ex, System.currentTimeMillis());
+            failJob(ex, System.currentTimeMillis(), 
FailureEnricherUtils.EMPTY_LABELS);

Review Comment:
   +1 to label this error, just like all other errors.
   One simple way can be replacing the `failJob()` with `handleGlobalFailure()` 
and notifying an unrecoverable error, e.g. `SuppressRestartsException`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##########
@@ -39,6 +44,10 @@ public class ErrorInfo implements Serializable {
 
     private final long timestamp;
 
+    private final transient CompletableFuture<Map<String, String>> 
labelsFuture;
+    /** Labels associated with the exception, set as soon as labelsFuture is 
completed. */
+    private Map<String, String> labels;

Review Comment:
   > we should just make sure labels field is marked as volatile, because it 
could be set from a different thread
   
   Maybe we can change `FailureEnricherUtils#labelFailure()` to return a future 
that will be completed in the main thread, to avoid unexpected bugs if later 
someone who does not have this knowledge develops callbacks upon this future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to