Hi Austin, Thanks for the reply! Yeah, the docs aren't super explicit about this.
But for what it's worth, I'm setting a few options unrelated to kubernetes this way and they all have effect: -Dstate.checkpoints.num-retained=100 \ -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \ -Dio.tmp.dirs=/data/flink-local-data \ -Dqueryable-state.enable=true \ The only one i'm having problems with is "execution.savepoint.ignore-unclaimed-state". On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hi Andrey, > > It's unclear to me from the docs[1] if the flink native-kubernetes > integration supports setting arbitrary config keys via the CLI. I'm cc'ing > Yang Wang, who has worked a lot in this area and can hopefully help us out. > > Best, > Austin > > [1]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes > > On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru> wrote: > >> Hey all, >> >> I'm working on migrating our Flink job away from Hadoop session mode to >> K8S application mode. >> It's been going great so far but I'm hitting a wall with this seemingly >> simple thing. >> >> In the first phase of the migration I want to remove some operators >> (their state can be discarded) and focus on getting the primary pipeline >> running first. >> For that I have to start the cluster from a savepoint with the >> "allowNonRestoredState" parameter turned on. >> >> The problem is that I can't set it in any way that I'm aware of. I tried >> 4 ways separately and simultaneously: >> >> 1) Adding --allowNonRestoredState to flink run-application >> -t kubernetes-application >> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink >> run-application -t kubernetes-application >> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local >> flink-conf.yaml where I'm running flink run-application >> 4) Overriding it in the application code: >> val sigh = new Configuration() >> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, >> true) >> env.configure(sigh) >> >> Every time the resulting pod ends up with "false" value for this setting >> in its configmap: >> $ kc describe cm/flink-config-flink-test | grep ignore >> execution.savepoint.ignore-unclaimed-state: false >> >> And I get the exception: >> java.lang.IllegalStateException: Failed to rollback to >> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for >> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the >> operator is not available in the new program. If you want to allow to skip >> this, you can set the --allowNonRestoredState option on the CLI. >> >> It seems like something overrides it to false and it never has any effect. >> >> Can this be a bug or am I doing something wrong? >> >> For context, the savepoint is produced by Flink 1.8.2 and the version I'm >> trying to run on K8S is 1.14.3. >> >> -- >> With regards, >> Andrey Bulgakov >> >> -- With regards, Andrey Bulgakov