Hello, We're experiencing an issue with operator 1.10 where:
1. A batch job will successfully finish 2. The operator throws an error because it can't find the job ID 3. The job doesn't get properly cleaned up and stays stuck in RUNNING state 4. As a result, the Flink UI and JMs remain up; the UI shows FINISHED and the flinkdeployment CR shows RUNNING I tested the exact same flinkdeployment with operator 1.09 and observed no issues. 1. The Flink job runs and finishes successfully 2. The Flink UI and JMs go down; the flinkdeployment CR shows FINISHED I was able to replicate the issue with 1.10 locally. Happy to share the flinkdeployment and operator configurations I used for the e2e test. ----- *My Diagnosis:* At first, I thought it was introduced by a change made to exception handling in [FLINK-35414] <https://github.com/Shopify/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java#L443>, specifically ReconciliationException, but it actually looks ok. I then stumbled upon [FLINK-36673] <https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-36673> which describes a very similar issue - though for FAILED deployments instead of FINISHED. I'm now thinking the issue lies within JobStatusObserver <https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java#L75>. In 1.09, it used listJobs() <https://github.com/apache/flink-kubernetes-operator/blob/release-1.9.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java#L75> to get job status, whereas in 1.10, it uses getJobStatus <https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java#L75> with a specific job ID. I think the former handled it gracefully, whereas the latter throws FlinkJobNotFoundException. This leads to an infinite retry loop when trying to observe checkpoints for completed jobs. This prevents the proper transition to FINISHED state and subsequent cleanup. Curious to hear if anyone has encountered this issue and if they have ideas or recommendations on how to proceed. ------ *Operator's log:* org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (1809bb7471686f2b901f5ab54a943ac1) org.apache.flink.kubernetes.operator.exception.ReconciliationException: Could not observe latest savepoint information at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getLastCheckpoint(AbstractFlinkService.java:546) at org.apache.flink.kubernetes.operator.observer.SnapshotObserver.observeLatestCheckpoint(SnapshotObserver.java:449) at org.apache.flink.kubernetes.operator.observer.SnapshotObserver.observeSavepointStatus(SnapshotObserver.java:93) at org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver.observeFlinkCluster(ApplicationObserver.java:51) at org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:71) at org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:145) Thank you, Luca