The config options configured by -D param should take effect. It is also the recommended way instead of CLI options(e.g. --fromSavepoint). Not only the K8s application, it also does not work for yarn application and yarn per-job mode. I believe it is indeed a bug in the current implementation and have created a ticket for this[1].
After then you could start the Flink k8s application via the following command. *$FLINK_HOME/bin/flink run-application -t kubernetes-application \-Dkubernetes.cluster-id=$CLUSTER_ID \-Dkubernetes.namespace=$NAMESPACE \-Dkubernetes.container.image=$IMAGE \-Dexecution.savepoint.ignore-unclaimed-state=true -Dexecution.savepoint.path=oss://flink-debug-yiqi/flink-ha \local:///opt/flink/examples/streaming/StateMachineExample.jar* If you still want to use the CLI options, then I expect at least you need to set "--fromSavepoint". [1]. https://issues.apache.org/jira/browse/FLINK-26316 Best, Yang Andrey Bulgakov <m...@andreiko.ru> 于2022年2月23日周三 04:09写道: > Thank you, Yang. That was it! Specifying "--fromSavepoint" and > "--allowNonRestoredState" for "run-application" together did the trick. > > I was a bit confused, because when you run "flink run-application --help", > it only tells you about the "--executor" and "--target" options. So I > assumed I should pass everything else as -D params. I had only tried > passing "--allowNonRestoredState" on the CLI as the last resort but didn't > think to do it together with "--fromSavepoint". > > Thanks again! > > On Sun, Feb 20, 2022 at 9:49 PM Yang Wang <danrtsey...@gmail.com> wrote: > >> By design, we should support arbitrary config keys via the CLI when using >> generic CLI mode. >> >> Do you have also specified the "--fromSavepoint" along with >> "--allowNonRestoredState" when submitting a Flink job via "flink >> run-application"? >> >> From the current code base, it seems that the CLI options(e.g >> --fromSavepoint, --allowNonRestoredState) have higher priority than Flink >> config options. >> And it will make the savepoint related config options are overwritten >> wrongly. Refer to the implementation[1]. >> >> [1]. >> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181 >> >> >> Best, >> Yang >> >> Andrey Bulgakov <m...@andreiko.ru> 于2022年2月19日周六 08:30写道: >> >>> Hi Austin, >>> >>> Thanks for the reply! Yeah, the docs aren't super explicit about this. >>> >>> But for what it's worth, I'm setting a few options unrelated to >>> kubernetes this way and they all have effect: >>> -Dstate.checkpoints.num-retained=100 \ >>> >>> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider >>> \ >>> -Dio.tmp.dirs=/data/flink-local-data \ >>> -Dqueryable-state.enable=true \ >>> >>> The only one i'm having problems with is >>> "execution.savepoint.ignore-unclaimed-state". >>> >>> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < >>> austin.caw...@gmail.com> wrote: >>> >>>> Hi Andrey, >>>> >>>> It's unclear to me from the docs[1] if the flink native-kubernetes >>>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing >>>> Yang Wang, who has worked a lot in this area and can hopefully help us out. >>>> >>>> Best, >>>> Austin >>>> >>>> [1]: >>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes >>>> >>>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru> >>>> wrote: >>>> >>>>> Hey all, >>>>> >>>>> I'm working on migrating our Flink job away from Hadoop session mode >>>>> to K8S application mode. >>>>> It's been going great so far but I'm hitting a wall with this >>>>> seemingly simple thing. >>>>> >>>>> In the first phase of the migration I want to remove some operators >>>>> (their state can be discarded) and focus on getting the primary pipeline >>>>> running first. >>>>> For that I have to start the cluster from a savepoint with the >>>>> "allowNonRestoredState" parameter turned on. >>>>> >>>>> The problem is that I can't set it in any way that I'm aware of. I >>>>> tried 4 ways separately and simultaneously: >>>>> >>>>> 1) Adding --allowNonRestoredState to flink run-application >>>>> -t kubernetes-application >>>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink >>>>> run-application -t kubernetes-application >>>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my >>>>> local flink-conf.yaml where I'm running flink run-application >>>>> 4) Overriding it in the application code: >>>>> val sigh = new Configuration() >>>>> >>>>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, >>>>> true) >>>>> env.configure(sigh) >>>>> >>>>> Every time the resulting pod ends up with "false" value for this >>>>> setting in its configmap: >>>>> $ kc describe cm/flink-config-flink-test | grep ignore >>>>> execution.savepoint.ignore-unclaimed-state: false >>>>> >>>>> And I get the exception: >>>>> java.lang.IllegalStateException: Failed to rollback to >>>>> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for >>>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the >>>>> operator is not available in the new program. If you want to allow to skip >>>>> this, you can set the --allowNonRestoredState option on the CLI. >>>>> >>>>> It seems like something overrides it to false and it never has any >>>>> effect. >>>>> >>>>> Can this be a bug or am I doing something wrong? >>>>> >>>>> For context, the savepoint is produced by Flink 1.8.2 and the version >>>>> I'm trying to run on K8S is 1.14.3. >>>>> >>>>> -- >>>>> With regards, >>>>> Andrey Bulgakov >>>>> >>>>> >>> >>> -- >>> With regards, >>> Andrey Bulgakov >>> >> > > -- > With regards, > Andrey Bulgakov >