Hi Austin,

Thanks for the reply! Yeah, the docs aren't super explicit about this.

But for what it's worth, I'm setting a few options unrelated to kubernetes
this way and they all have effect:
    -Dstate.checkpoints.num-retained=100 \

-Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
\
    -Dio.tmp.dirs=/data/flink-local-data \
    -Dqueryable-state.enable=true \

The only one i'm having problems with is
"execution.savepoint.ignore-unclaimed-state".

On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Andrey,
>
> It's unclear to me from the docs[1] if the flink native-kubernetes
> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>
> Best,
> Austin
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>
> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru> wrote:
>
>> Hey all,
>>
>> I'm working on migrating our Flink job away from Hadoop session mode to
>> K8S application mode.
>> It's been going great so far but I'm hitting a wall with this seemingly
>> simple thing.
>>
>> In the first phase of the migration I want to remove some operators
>> (their state can be discarded) and focus on getting the primary pipeline
>> running first.
>> For that I have to start the cluster from a savepoint with the
>> "allowNonRestoredState" parameter turned on.
>>
>> The problem is that I can't set it in any way that I'm aware of. I tried
>> 4 ways separately and simultaneously:
>>
>> 1) Adding --allowNonRestoredState to flink run-application
>> -t kubernetes-application
>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>> run-application -t kubernetes-application
>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
>> flink-conf.yaml where I'm running flink run-application
>> 4) Overriding it in the application code:
>>     val sigh = new Configuration()
>>     sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>> true)
>>     env.configure(sigh)
>>
>> Every time the resulting pod ends up with "false" value for this setting
>> in its configmap:
>> $ kc describe cm/flink-config-flink-test | grep ignore
>> execution.savepoint.ignore-unclaimed-state: false
>>
>> And I get the exception:
>> java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for
>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>> operator is not available in the new program. If you want to allow to skip
>> this, you can set the --allowNonRestoredState option on the CLI.
>>
>> It seems like something overrides it to false and it never has any effect.
>>
>> Can this be a bug or am I doing something wrong?
>>
>> For context, the savepoint is produced by Flink 1.8.2 and the version I'm
>> trying to run on K8S is 1.14.3.
>>
>> --
>> With regards,
>> Andrey Bulgakov
>>
>>

-- 
With regards,
Andrey Bulgakov

Reply via email to