Thank you, Yang. That was it! Specifying "--fromSavepoint" and "--allowNonRestoredState" for "run-application" together did the trick.
I was a bit confused, because when you run "flink run-application --help", it only tells you about the "--executor" and "--target" options. So I assumed I should pass everything else as -D params. I had only tried passing "--allowNonRestoredState" on the CLI as the last resort but didn't think to do it together with "--fromSavepoint". Thanks again! On Sun, Feb 20, 2022 at 9:49 PM Yang Wang <danrtsey...@gmail.com> wrote: > By design, we should support arbitrary config keys via the CLI when using > generic CLI mode. > > Do you have also specified the "--fromSavepoint" along with > "--allowNonRestoredState" when submitting a Flink job via "flink > run-application"? > > From the current code base, it seems that the CLI options(e.g > --fromSavepoint, --allowNonRestoredState) have higher priority than Flink > config options. > And it will make the savepoint related config options are overwritten > wrongly. Refer to the implementation[1]. > > [1]. > https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181 > > > Best, > Yang > > Andrey Bulgakov <m...@andreiko.ru> 于2022年2月19日周六 08:30写道: > >> Hi Austin, >> >> Thanks for the reply! Yeah, the docs aren't super explicit about this. >> >> But for what it's worth, I'm setting a few options unrelated to >> kubernetes this way and they all have effect: >> -Dstate.checkpoints.num-retained=100 \ >> >> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider >> \ >> -Dio.tmp.dirs=/data/flink-local-data \ >> -Dqueryable-state.enable=true \ >> >> The only one i'm having problems with is >> "execution.savepoint.ignore-unclaimed-state". >> >> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < >> austin.caw...@gmail.com> wrote: >> >>> Hi Andrey, >>> >>> It's unclear to me from the docs[1] if the flink native-kubernetes >>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing >>> Yang Wang, who has worked a lot in this area and can hopefully help us out. >>> >>> Best, >>> Austin >>> >>> [1]: >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes >>> >>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru> >>> wrote: >>> >>>> Hey all, >>>> >>>> I'm working on migrating our Flink job away from Hadoop session mode to >>>> K8S application mode. >>>> It's been going great so far but I'm hitting a wall with this seemingly >>>> simple thing. >>>> >>>> In the first phase of the migration I want to remove some operators >>>> (their state can be discarded) and focus on getting the primary pipeline >>>> running first. >>>> For that I have to start the cluster from a savepoint with the >>>> "allowNonRestoredState" parameter turned on. >>>> >>>> The problem is that I can't set it in any way that I'm aware of. I >>>> tried 4 ways separately and simultaneously: >>>> >>>> 1) Adding --allowNonRestoredState to flink run-application >>>> -t kubernetes-application >>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink >>>> run-application -t kubernetes-application >>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my >>>> local flink-conf.yaml where I'm running flink run-application >>>> 4) Overriding it in the application code: >>>> val sigh = new Configuration() >>>> >>>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, >>>> true) >>>> env.configure(sigh) >>>> >>>> Every time the resulting pod ends up with "false" value for this >>>> setting in its configmap: >>>> $ kc describe cm/flink-config-flink-test | grep ignore >>>> execution.savepoint.ignore-unclaimed-state: false >>>> >>>> And I get the exception: >>>> java.lang.IllegalStateException: Failed to rollback to >>>> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for >>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the >>>> operator is not available in the new program. If you want to allow to skip >>>> this, you can set the --allowNonRestoredState option on the CLI. >>>> >>>> It seems like something overrides it to false and it never has any >>>> effect. >>>> >>>> Can this be a bug or am I doing something wrong? >>>> >>>> For context, the savepoint is produced by Flink 1.8.2 and the version >>>> I'm trying to run on K8S is 1.14.3. >>>> >>>> -- >>>> With regards, >>>> Andrey Bulgakov >>>> >>>> >> >> -- >> With regards, >> Andrey Bulgakov >> > -- With regards, Andrey Bulgakov