Hello,

We're experiencing an issue with operator 1.10 where:

   1. A batch job will successfully finish
   2. The operator throws an error because it can't find the job ID
   3. The job doesn't get properly cleaned up and stays stuck in RUNNING
   state
   4. As a result, the Flink UI and JMs remain up; the UI shows FINISHED
   and the flinkdeployment CR shows RUNNING

I tested the exact same flinkdeployment with operator 1.09 and observed no
issues.

   1. The Flink job runs and finishes successfully
   2. The Flink UI and JMs go down; the flinkdeployment CR shows FINISHED

I was able to replicate the issue with 1.10 locally. Happy to share the
flinkdeployment and operator configurations I used for the e2e test.


-----
*My Diagnosis:*
At first, I thought it was introduced by a change made to exception
handling in [FLINK-35414]
<https://github.com/Shopify/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java#L443>,
specifically ReconciliationException, but it actually looks ok. I then
stumbled upon [FLINK-36673]
<https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-36673> which
describes a very similar issue - though for FAILED deployments instead of
FINISHED.

I'm now thinking the issue lies within JobStatusObserver
<https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java#L75>.
In 1.09, it used listJobs()
<https://github.com/apache/flink-kubernetes-operator/blob/release-1.9.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java#L75>
to get job status, whereas in 1.10, it uses getJobStatus
<https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java#L75>
with
a specific job ID. I think the former handled it gracefully, whereas the
latter throws FlinkJobNotFoundException.
This leads to an infinite retry loop when trying to observe checkpoints for
completed jobs. This prevents the proper transition to FINISHED state and
subsequent cleanup.

Curious to hear if anyone has encountered this issue and if they have ideas
or recommendations on how to proceed.


------
*Operator's log:*
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
Flink job (1809bb7471686f2b901f5ab54a943ac1)

org.apache.flink.kubernetes.operator.exception.ReconciliationException:
Could not observe latest savepoint information
    at
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getLastCheckpoint(AbstractFlinkService.java:546)
    at
org.apache.flink.kubernetes.operator.observer.SnapshotObserver.observeLatestCheckpoint(SnapshotObserver.java:449)
    at
org.apache.flink.kubernetes.operator.observer.SnapshotObserver.observeSavepointStatus(SnapshotObserver.java:93)
    at
org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver.observeFlinkCluster(ApplicationObserver.java:51)
    at
org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:71)
    at
org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49)
    at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:145)


Thank you,
Luca

Reply via email to