XComp commented on a change in pull request #15049:
URL: https://github.com/apache/flink/pull/15049#discussion_r594292941



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
##########
@@ -132,7 +142,75 @@ private static JobExceptionsInfo createJobExceptionsInfo(
             }
         }
 
-        return new JobExceptionsInfo(
-                rootExceptionMessage, rootTimestamp, taskExceptionList, 
truncated);
+        final ErrorInfo rootCause = executionGraph.getFailureInfo();
+        return new JobExceptionsInfoWithHistory(
+                rootCause.getExceptionAsString(),
+                rootCause.getTimestamp(),
+                taskExceptionList,
+                truncated,
+                createJobExceptionHistory(
+                        executionGraphInfo.getExceptionHistory(), 
exceptionToReportMaxSize));
+    }
+
+    static JobExceptionsInfoWithHistory.JobExceptionHistory 
createJobExceptionHistory(
+            Iterable<ExceptionHistoryEntry> historyEntries, int limit) {
+        // we need to reverse the history to have a stable result when doing 
paging on it
+        final List<ExceptionHistoryEntry> reversedHistoryEntries = new 
ArrayList<>();
+        Iterables.addAll(reversedHistoryEntries, historyEntries);
+        Collections.reverse(reversedHistoryEntries);
+
+        List<JobExceptionsInfoWithHistory.ExceptionInfo> 
exceptionHistoryEntries =
+                reversedHistoryEntries.stream()
+                        .limit(limit)
+                        .map(JobExceptionsHandler::createExceptionInfo)
+                        .collect(Collectors.toList());
+
+        return new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                exceptionHistoryEntries,
+                exceptionHistoryEntries.size() < 
reversedHistoryEntries.size());
+    }
+
+    private static JobExceptionsInfoWithHistory.ExceptionInfo 
createExceptionInfo(
+            ExceptionHistoryEntry historyEntry) {
+        if (historyEntry.isGlobal()) {
+            return new JobExceptionsInfoWithHistory.ExceptionInfo(
+                    historyEntry.getException().getOriginalErrorClassName(),
+                    historyEntry.getExceptionAsString(),
+                    historyEntry.getTimestamp());
+        }
+
+        Preconditions.checkArgument(
+                historyEntry.getFailingTaskName() != null,
+                "The taskName must not be null for a non-global failure.");
+        Preconditions.checkArgument(
+                historyEntry.getTaskManagerLocation() != null,
+                "The location must not be null for a non-global failure.");
+
+        return new JobExceptionsInfoWithHistory.ExceptionInfo(
+                historyEntry.getException().getOriginalErrorClassName(),
+                historyEntry.getExceptionAsString(),
+                historyEntry.getTimestamp(),
+                historyEntry.getFailingTaskName(),
+                toString(historyEntry.getTaskManagerLocation()));
+    }
+
+    @VisibleForTesting
+    @Nullable
+    static String toString(@Nullable TaskManagerLocation location) {
+        return location != null
+                ? taskManagerLocationToString(location.getFQDNHostname(), 
location.dataPort())
+                : "(unassigned)";
+    }
+
+    @VisibleForTesting
+    @Nullable
+    static String toString(@Nullable 
ExceptionHistoryEntry.ArchivedTaskManagerLocation location) {
+        return location != null
+                ? taskManagerLocationToString(location.getFQDNHostname(), 
location.getPort())
+                : null;

Review comment:
       Good idea 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to