I wanted to talk about an issue we've hit recently with Flink Kubernetes Operator 1.6.1 and Flink 1.17.1.
As we're using the Savepoint upgrade mode, we ran into cases where the lastSavepoint in status doesn't seem to update (still digging into why, could be an exception when cancelling tasks?).This ends up making flink restore from an outdated savepoint. Since there is a job.upgrade.last-state.max.allowed.checkpoint.age option, could we add a similar option for savepoint as well? So if the last-savepoint is too old, we get an error or use the latest checkpoint when upgrading(fallback to last-state). Or is there a better solution maybe? Would love to hear your thoughts, or any other ideas you might have. Thanks!