Hey all, I'm working on migrating our Flink job away from Hadoop session mode to K8S application mode. It's been going great so far but I'm hitting a wall with this seemingly simple thing.
In the first phase of the migration I want to remove some operators (their state can be discarded) and focus on getting the primary pipeline running first. For that I have to start the cluster from a savepoint with the "allowNonRestoredState" parameter turned on. The problem is that I can't set it in any way that I'm aware of. I tried 4 ways separately and simultaneously: 1) Adding --allowNonRestoredState to flink run-application -t kubernetes-application 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink run-application -t kubernetes-application 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local flink-conf.yaml where I'm running flink run-application 4) Overriding it in the application code: val sigh = new Configuration() sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, true) env.configure(sigh) Every time the resulting pod ends up with "false" value for this setting in its configmap: $ kc describe cm/flink-config-flink-test | grep ignore execution.savepoint.ignore-unclaimed-state: false And I get the exception: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for operator 68895e9129981bfc6d96d1dad715298e to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. It seems like something overrides it to false and it never has any effect. Can this be a bug or am I doing something wrong? For context, the savepoint is produced by Flink 1.8.2 and the version I'm trying to run on K8S is 1.14.3. -- With regards, Andrey Bulgakov