Adding a bit more context to the puzzle – and tagging in Gyula based off git blame 😊
I’ve been reading through https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink and the associated code. I can see that there are two config maps created - <deployment>-cluster-config-map and <deployment>-<jobid>-config-map. The former being the HA metadata for job managers, the latter being the HA metadata for a specific job. When a job fails, the HA metadata <jobid>-config-map is removed, and the job-graph is removed from the <deployment>-cluster-config-map. The job specific ConfigMap is the one that the Operator is looking for when trying to recover the JM. If I delete a JM pod once the job fails, it seems that the new pod will create a FileSystemJobResultStore (https://github.com/apache/flink/blob/997b48340d2aac1de48c0788b4204d660e34cedd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java#L102) that loads completed jobs from the high-availability.storageDir. The pod then tries to submit the ApplicationCluster job, which fails because it is recorded as failed in the job result store. The JobManager also appears to be creating a CompletedCheckpointStore (https://github.com/apache/flink/blob/997b48340d2aac1de48c0788b4204d660e34cedd/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java#L91) which is loading from both the HA metadata AND falling back to file system (https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java#L308). As a result, it seems like the JobManager pod being deleted after a failed ApplicationCluster job can restore to a working state (in so much as there is no running job that had previously failed) >From the Operator perspective - I can see that >https://issues.apache.org/jira/browse/FLINK-30437 resulted in changing the >check to require the HA job metadata before deploying an ApplicationCluster, >but it’s not clear exactly why. At face value it looks like the underlying >JobManagers can recover from disk if the HA metadata is missing. -- Nic Townsend IBM Event Processing Senior Engineer / Technical Lead Slack: @nictownsend Bluesky: @nict0wnsend.bsky.social From: Nic Townsend <nictowns...@uk.ibm.com> Date: Tuesday, 14 January 2025 at 19:13 To: user@flink.apache.org <user@flink.apache.org> Subject: [EXTERNAL] Re: Restoring lost JobManager deployment Thinking a bit more - is this a bug with fabric8? Assuming the presence of the config map means the JM will try to load HA, and without the map it starts a job from savepoint dir. The config map exists - but the fabric8 configmap check is returning Thinking a bit more - is this a bug with fabric8? Assuming the presence of the config map means the JM will try to load HA, and without the map it starts a job from savepoint dir. The config map exists - but the fabric8 configmap check is returning false, so the operator won't create the deployment. If it did get told the config map existed, then the JM deployment could be recreated and it would behave the same as if the pod restarted. The only caveat would be I assume when deployment was deleted we update the spec.error, so the original job error would be lost. ________________________________ From: Nic Townsend <nictowns...@uk.ibm.com> Sent: 14 January 2025 17:48 To: user@flink.apache.org <user@flink.apache.org> Subject: [EXTERNAL] Restoring lost JobManager deployment Hi all, I’m looking for some help understanding a behaviour of the k8s operator when recovering a JobManager deployment. Scenario is – ApplicationCluster, with HA enabled and upgrade mode of savepoint. The job is in failed state – so it’s not Hi all, I’m looking for some help understanding a behaviour of the k8s operator when recovering a JobManager deployment. Scenario is – ApplicationCluster, with HA enabled and upgrade mode of savepoint. The job is in failed state – so it’s not in the HA config map. * If I delete the JM pod, the leader information is removed from the HA config map (so it’s now empty). k8s creates a new pod – and when it starts, the application job is submitted and fails because the persisted HA data knows the job was in failure state – even without HA metadata. WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'sql-runner-deploy' (0bb6676ed5cf22dded08018f1aca7794) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution. * If I change the spec of a failed job or use restartNonce – the operator will delete the HA config map, then delete and create a new JM deployment. The ApplicationCluster then resumes from the checkpoint. My guess is that using the restart nonce tells the operator not to look for HA metadata because it knows it personally deleted the CM. The JobManager clears out persisted HA data on startup if the CM is missing, and the job can be re-submitted and accepted. org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting job 0bb6676ed5cf22dded08018f1aca7794 from savepoint file:/opt/flink/volume/flink-cp/0bb6676ed5cf22dded08018f1aca7794/chk-15 (allowing non restored state INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 'sql-runner-deploy' (0bb6676ed5cf22dded08018f1aca7794). * If I delete the underlying JobManager deployment (config map is now empty), the operator goes into a failure state with: [ERROR][sp/sql-runner-deploy] Flink recovery failed org.apache.flink.kubernetes.operator.exception.RecoveryFailureException: HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. I can see that the AbstractFlinkDeploymentObserver spots the missing JobManager deployment and marks the CR into an error state. This then triggers a reconcile, and ApplicationReconciler.reconcileOtherChanges() realizes the deployment needs to be recovered so triggers resubmitJob() – including requiring HA metadata to exist because the ApplicationCluster is configured in HA mode. That eventually trickles down to AbstractFlinkService.submitApplicationCluster(), which will validate HA metadata. This then appears to fail because the fabric8 client is filtering out the empty HA config map. I understand that when a JM pod restarts, that’s business as usual for Flink HA – and the job therefore doesn’t restart (though it also disappears from the JM job list too). Am I correct in assuming that the operator is looking for HA when recovering the deployment to ensure that it doesn’t redeploy a failed job automatically? Whereas with restartNonce you are explicitly asking to restart? If that is the case – why does using restartNonce not recreate the missing deployment? -- Nic Townsend IBM Event Processing Senior Engineer / Technical Lead Slack: @nictownsend Bluesky: @nict0wnsend.bsky.social Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: Building C, IBM Hursley Office, Hursley Park Road, Winchester, Hampshire SO21 2JN Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: Building C, IBM Hursley Office, Hursley Park Road, Winchester, Hampshire SO21 2JN Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: Building C, IBM Hursley Office, Hursley Park Road, Winchester, Hampshire SO21 2JN