Adding a bit more context to the puzzle – and tagging in Gyula based off git 
blame 😊

I’ve been reading through 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
 and the associated code.

I can see that there are two config maps created - 
<deployment>-cluster-config-map and <deployment>-<jobid>-config-map.

The former being the HA metadata for job managers, the latter being the HA 
metadata for a specific job.

When a job fails, the HA metadata <jobid>-config-map is removed, and the 
job-graph is removed from the <deployment>-cluster-config-map. The job specific 
ConfigMap is the one that the Operator is looking for when trying to recover 
the JM.

If I delete a JM pod once the job fails, it seems that the new pod will create 
a FileSystemJobResultStore 
(https://github.com/apache/flink/blob/997b48340d2aac1de48c0788b4204d660e34cedd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java#L102)
 that loads completed jobs from the high-availability.storageDir. The pod then 
tries to submit the ApplicationCluster job, which fails because it is recorded 
as failed in the job result store.

The JobManager also appears to be creating a CompletedCheckpointStore 
(https://github.com/apache/flink/blob/997b48340d2aac1de48c0788b4204d660e34cedd/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java#L91)
  which is loading from both the HA metadata AND falling back to file system 
(https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java#L308).

As a result, it seems like the JobManager pod being deleted after a failed 
ApplicationCluster job can restore to a working state (in so much as there is 
no running job that had previously failed)

>From the Operator perspective - I can see that 
>https://issues.apache.org/jira/browse/FLINK-30437 resulted in changing the 
>check to require the HA job metadata before deploying an ApplicationCluster, 
>but it’s not clear exactly why. At face value it looks like the underlying 
>JobManagers can recover from disk if the HA metadata is missing.

--

Nic Townsend
IBM Event Processing
Senior Engineer / Technical Lead

Slack: @nictownsend
Bluesky: @nict0wnsend.bsky.social



From: Nic Townsend <nictowns...@uk.ibm.com>
Date: Tuesday, 14 January 2025 at 19:13
To: user@flink.apache.org <user@flink.apache.org>
Subject: [EXTERNAL] Re: Restoring lost JobManager deployment
Thinking a bit more - is this a bug with fabric8? Assuming the presence of the 
config map means the JM will try to load HA, and without the map it starts a 
job from savepoint dir. The config map exists - but the fabric8 configmap check 
is returning

Thinking a bit more - is this a bug with fabric8?

Assuming the presence of the config map means the JM will try to load HA, and 
without the map it starts a job from savepoint dir.

The config map exists - but the fabric8 configmap check is returning false, so 
the operator won't create the deployment.

If it did get told the config map existed, then the JM deployment could be 
recreated and it would behave the same as if the pod restarted.

The only caveat would be I assume when deployment was deleted we update the 
spec.error, so the original job error would be lost.


________________________________
From: Nic Townsend <nictowns...@uk.ibm.com>
Sent: 14 January 2025 17:48
To: user@flink.apache.org <user@flink.apache.org>
Subject: [EXTERNAL] Restoring lost JobManager deployment

Hi all, I’m looking for some help understanding a behaviour of the k8s operator 
when recovering a JobManager deployment. Scenario is – ApplicationCluster, with 
HA enabled and upgrade mode of savepoint. The job is in failed state – so it’s 
not

Hi all,



I’m looking for some help understanding a behaviour of the k8s operator when 
recovering a JobManager deployment.



Scenario is – ApplicationCluster, with HA enabled and upgrade mode of 
savepoint. The job is in failed state – so it’s not in the HA config map.



  *   If I delete the JM pod, the leader information is removed from the HA 
config map (so it’s now empty). k8s creates a new pod – and when it starts, the 
application job is submitted and fails because the persisted HA data knows the 
job was in failure state – even without HA metadata.



WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring 
JobGraph submission 'sql-runner-deploy' (0bb6676ed5cf22dded08018f1aca7794) 
because the job already reached a globally-terminal state (i.e. FAILED, 
CANCELED, FINISHED) in a previous execution.



  *   If I change the spec of a failed job or use restartNonce – the operator 
will delete the HA config map, then delete and create a new JM deployment. The 
ApplicationCluster then resumes from the checkpoint. My guess is that using the 
restart nonce tells the operator not to look for HA metadata because it knows 
it personally deleted the CM. The JobManager clears out persisted HA data on 
startup if the CM is missing, and the job can be re-submitted and accepted.



org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job 
0bb6676ed5cf22dded08018f1aca7794 from savepoint 
file:/opt/flink/volume/flink-cp/0bb6676ed5cf22dded08018f1aca7794/chk-15 
(allowing non restored state



INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting 
job 'sql-runner-deploy' (0bb6676ed5cf22dded08018f1aca7794).



  *   If I delete the underlying JobManager deployment (config map is now 
empty), the operator goes into a failure state with:



[ERROR][sp/sql-runner-deploy] Flink recovery failed



org.apache.flink.kubernetes.operator.exception.RecoveryFailureException: HA 
metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted.



I can see that the AbstractFlinkDeploymentObserver spots the missing JobManager 
deployment and marks the CR into an error state. This then triggers a 
reconcile, and ApplicationReconciler.reconcileOtherChanges() realizes the 
deployment needs to be recovered so triggers resubmitJob() – including 
requiring HA metadata to exist because the ApplicationCluster is configured in 
HA mode.



That eventually trickles down to 
AbstractFlinkService.submitApplicationCluster(), which will validate HA 
metadata. This then appears to fail because the fabric8 client is filtering out 
the empty HA config map.



I understand that when a JM pod restarts, that’s business as usual for Flink HA 
– and the job therefore doesn’t restart (though it also disappears from the JM 
job list too). Am I correct in assuming that the operator is looking for HA 
when recovering the deployment to ensure that it doesn’t redeploy a failed job 
automatically? Whereas with restartNonce you are explicitly asking to restart?



If that is the case – why does using restartNonce not recreate the missing 
deployment?

--



Nic Townsend

IBM Event Processing

Senior Engineer / Technical Lead



Slack: @nictownsend

Bluesky: @nict0wnsend.bsky.social




Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN
Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN

Reply via email to