Ruibin Xing created FLINK-32520: ----------------------------------- Summary: FlinkDeployment recovered states from an obsolete savepoint Key: FLINK-32520 URL: https://issues.apache.org/jira/browse/FLINK-32520 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Affects Versions: 1.13.1 Reporter: Ruibin Xing Attachments: flink_kubernetes_operator_0615.csv
Kubernetes Operator version: 1.5.0 When upgrading one of our Flink jobs, it recovered from a savepoint created by the previous version of the job. The timeline of the job is as follows: # I upgraded the job for the first time. The job created a savepoint and successfully restored from it. # The job was running fine and created several checkpoints. # Later, I performed the second upgrade. Soon after submission and before the JobManager stopped, I realized I made a mistake in the spec, so I quickly did the third upgrade. # After the job started, I found that it had recovered from the savepoint created during the first upgrade. It appears that there was an error when submitting the third upgrade. However, I'm still not quite sure why this would cause Flink to use the obsolete savepoint after investigating the code. The related logs for the operator are attached below. Although I haven't found the root cause, I came up with some possible fixes: # Remove the {{lastSavepoint}} after a job has successfully restored from it. # Add options for savepoint, similar to: {{kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age}} The operator should refuse to recover from the savepoint if the max age is exceeded. # Create a flag in the status that records savepoint states. Set the flag to false when the savepoint starts and mark it as true when it successfully ends. The job should report an error if the flag for the last savepoint is false. -- This message was sent by Atlassian Jira (v8.20.10#820010)