[ https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nicolas Fraison updated FLINK-32890: ------------------------------------ Description: Here are all details about the issue (based on an app/scenario reproducing the issue): * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Found 3 checkpoints in ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at }}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} * Job failed because of the missing operator {code:java} Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has been registered for cleanup in the JobResultStore after reaching a terminal state.{code} * {{Clean up the high availability data for job 6b24a364c1905e924a69f3dbff0d26a6.}} * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} * JobManager restart and try to resubmit the job but the job was already submitted so finished * {{Received JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Ignoring JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.}} * {{Application completed SUCCESSFULLY}} * Finally the operator rollback the deployment and still indicate that {{Job is not running but HA metadata is available for last state restore, ready for upgrade}} * But the job metadata are not anymore there (clean previously) {code:java} (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints Path /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints doesn't exist (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico jobgraphs jobs leader {code} The rolled back app from flink operator finally take the last provided savepoint as no metadata/checkpoints are available. But this last savepoint is an old one as during the upgrade the operator decided to rely on last-state was: Here are all details about the issue (based on an app/scenario reproducing the issue): * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Found 3 checkpoints in ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at }}{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} * Job failed because of the missing operator Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state f298e8715b4d85e6f965b60e1c848cbe * {{Job 6b24a364c1905e924a69f3dbff0d26a6 has been registered for cleanup in the JobResultStore after reaching a terminal state.}} * {{Clean up the high availability data for job 6b24a364c1905e924a69f3dbff0d26a6.}} * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} * JobManager restart and try to resubmit the job but the job was already submitted so finished * {{Received JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Ignoring JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.}} * {{Application completed SUCCESSFULLY}} * Finally the operator rollback the deployment and still indicate that {{Job is not running but HA metadata is available for last state restore, ready for upgrade}} * But the job metadata are not anymore there (clean previously) (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints Path /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints doesn't exist (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico jobgraphs jobs leader * The rolled back app from flink operator finally take the last provided savepoint as no metadata/checkpoints are available. But this last savepoint is an old one as during the upgrade the operator decided to rely on last-state > Flink app rolled back with old savepoints (3 hours back in time) while some > checkpoints have been taken in between > ------------------------------------------------------------------------------------------------------------------ > > Key: FLINK-32890 > URL: https://issues.apache.org/jira/browse/FLINK-32890 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator > Reporter: Nicolas Fraison > Priority: Major > > Here are all details about the issue (based on an app/scenario reproducing > the issue): * Deployed new release of a flink app with a new operator > * Flink Operator set the app as stable > * After some time the app failed and stay in failed state (due to some issue > with our kafka clusters) > * Sadly the team in charge of this flink app decided to rollback the app as > they were thinking it was linked to this new deployment > * Operator detect: {{Job is not running but HA metadata is available for > last state restore, ready for upgrade, Deleting JobManager deployment while > preserving HA metadata.}} -> rely on last-state (as we do not disable > fallback), no savepoint taken > * Flink start JM and deployment of the app. It well find the 3 checkpoints > * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as > Zookeeper namespace.}} > * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} > * {{Recovering checkpoints from > ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} > * {{Found 3 checkpoints in > ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} > * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ > 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at > }}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} > * Job failed because of the missing operator > {code:java} > Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: There is no operator for the state > f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has > been registered for cleanup in the JobResultStore after reaching a terminal > state.{code} > * {{Clean up the high availability data for job > 6b24a364c1905e924a69f3dbff0d26a6.}} > * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from > ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} > * JobManager restart and try to resubmit the job but the job was already > submitted so finished > * {{Received JobGraph submission 'flink-kafka-job' > (6b24a364c1905e924a69f3dbff0d26a6).}} > * {{Ignoring JobGraph submission 'flink-kafka-job' > (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a > globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous > execution.}} > * {{Application completed SUCCESSFULLY}} > * Finally the operator rollback the deployment and still indicate that {{Job > is not running but HA metadata is available for last state restore, ready for > upgrade}} > * But the job metadata are not anymore there (clean previously) > > {code:java} > (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints > Path > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints > doesn't exist > (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs > (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico > jobgraphs > jobs > leader > {code} > > The rolled back app from flink operator finally take the last provided > savepoint as no metadata/checkpoints are available. But this last savepoint > is an old one as during the upgrade the operator decided to rely on last-state -- This message was sent by Atlassian Jira (v8.20.10#820010)