gyfora commented on code in PR #978: URL: https://github.com/apache/flink-kubernetes-operator/pull/978#discussion_r2077396326
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java: ########## @@ -280,7 +280,8 @@ public KubernetesClient getKubernetesClient() { /** The type of the events. */ public enum Type { Normal, - Warning + Warning, + Error Review Comment: We could keep using the `Warning` type here. I wouldn't consider these events more severe than other operator errors that are recorded as warnings especially that the job often recovers from them ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java: ########## @@ -315,6 +316,7 @@ public enum Reason { UnsupportedFlinkVersion, SnapshotError, SnapshotAbandoned, + JobManagerException, Review Comment: Should this be simply `JobException`? ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java: ########## @@ -299,9 +303,92 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx) return true; } + // check for JobManager exceptions if the REST API server is still up. + if (!ReconciliationUtils.isJobInTerminalState(deployment.getStatus())) { + observeJobManagerExceptions(ctx, deployment, observeConfig); + } + return cleanupTerminalJmAfterTtl(ctx.getFlinkService(), deployment, observeConfig); } + private void observeJobManagerExceptions( + FlinkResourceContext<FlinkDeployment> ctx, + FlinkDeployment deployment, + Configuration observeConfig) { + try { + var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()); + var history = ctx.getFlinkService().getJobExceptions(deployment, jobId, observeConfig); + if (history == null || history.getExceptionHistory() == null) { + return; + } + var exceptionHistory = history.getExceptionHistory(); + var exceptions = exceptionHistory.getEntries(); + if (exceptions.isEmpty()) { + LOG.info(String.format("No exceptions found in job exception history for jobId '%s'.", jobId)); + return; + } + if (exceptionHistory.isTruncated()) { + LOG.warn(String.format("Job exception history is truncated for jobId '%s'. " + + "Some exceptions are not shown.", jobId)); + } + for (var exception : exceptions) { + emitJobManagerExceptionEvent(ctx, deployment, exception); + } + } catch (Exception e) { + LOG.warn("Could not fetch JobManager exception info.", e); + } + } + + private void emitJobManagerExceptionEvent( + FlinkResourceContext<FlinkDeployment> ctx, + FlinkDeployment deployment, + JobExceptionsInfoWithHistory.RootExceptionInfo exception) { + + String message = exception.getExceptionName(); + if (message == null || message.isBlank()) { + return; + } + + String stacktrace = exception.getStacktrace(); + String taskName = exception.getTaskName(); + String endpoint = exception.getEndpoint(); + String tmId = exception.getTaskManagerId(); + Map<String, String> labels = exception.getFailureLabels(); + String time = DateTimeUtils.readable(Instant.ofEpochMilli(exception.getTimestamp()), ZoneId.systemDefault()); + + StringBuilder combined = new StringBuilder(); + combined.append("JobManager Exception at ").append(time).append(":\n"); + combined.append(message).append("\n\n"); + + if (taskName != null) { + combined.append("Task: ").append(taskName).append("\n"); + } + if (endpoint != null) { + combined.append("Endpoint: ").append(endpoint).append("\n"); + } + if (tmId != null) { + combined.append("TaskManager ID: ").append(tmId).append("\n"); + } + Review Comment: I think we should try to simplify the reported error here, I think adding labels, full stack trace etc is way out of scope for kubernetes events. As a matter of comparison for errors that happen within the operator or where we have an Exception object we report it in a simplified format such as: ``` Cause 1 -> Cause 2 -> Cause 3 ``` For this we use `ExceptionUtils#getExceptionMessage`. I understand that here the format is a bit different but we should try to make this slimmer and more digestible :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org