gyfora commented on code in PR #978:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/978#discussion_r2077428513
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -299,9 +303,92 @@ public boolean
reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
return true;
}
+ // check for JobManager exceptions if the REST API server is still up.
+ if (!ReconciliationUtils.isJobInTerminalState(deployment.getStatus()))
{
+ observeJobManagerExceptions(ctx, deployment, observeConfig);
+ }
+
return cleanupTerminalJmAfterTtl(ctx.getFlinkService(), deployment,
observeConfig);
}
+ private void observeJobManagerExceptions(
+ FlinkResourceContext<FlinkDeployment> ctx,
+ FlinkDeployment deployment,
+ Configuration observeConfig) {
+ try {
+ var jobId =
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ var history = ctx.getFlinkService().getJobExceptions(deployment,
jobId, observeConfig);
+ if (history == null || history.getExceptionHistory() == null) {
+ return;
+ }
+ var exceptionHistory = history.getExceptionHistory();
+ var exceptions = exceptionHistory.getEntries();
+ if (exceptions.isEmpty()) {
+ LOG.info(String.format("No exceptions found in job exception
history for jobId '%s'.", jobId));
+ return;
+ }
+ if (exceptionHistory.isTruncated()) {
+ LOG.warn(String.format("Job exception history is truncated for
jobId '%s'. "
+ + "Some exceptions are not shown.", jobId));
+ }
+ for (var exception : exceptions) {
+ emitJobManagerExceptionEvent(ctx, deployment, exception);
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not fetch JobManager exception info.", e);
+ }
+ }
+
+ private void emitJobManagerExceptionEvent(
+ FlinkResourceContext<FlinkDeployment> ctx,
+ FlinkDeployment deployment,
+ JobExceptionsInfoWithHistory.RootExceptionInfo exception) {
+
+ String message = exception.getExceptionName();
+ if (message == null || message.isBlank()) {
+ return;
+ }
+
+ String stacktrace = exception.getStacktrace();
+ String taskName = exception.getTaskName();
+ String endpoint = exception.getEndpoint();
+ String tmId = exception.getTaskManagerId();
+ Map<String, String> labels = exception.getFailureLabels();
+ String time =
DateTimeUtils.readable(Instant.ofEpochMilli(exception.getTimestamp()),
ZoneId.systemDefault());
+
+ StringBuilder combined = new StringBuilder();
+ combined.append("JobManager Exception at ").append(time).append(":\n");
+ combined.append(message).append("\n\n");
+
+ if (taskName != null) {
+ combined.append("Task: ").append(taskName).append("\n");
+ }
+ if (endpoint != null) {
+ combined.append("Endpoint: ").append(endpoint).append("\n");
+ }
+ if (tmId != null) {
+ combined.append("TaskManager ID: ").append(tmId).append("\n");
+ }
+
Review Comment:
If you think the stack trace is useful in general and you would like to have
it we can start with that. And introduce configs later if we feel that it's too
verbose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]