Thank you, Yang. That was it! Specifying "--fromSavepoint" and
"--allowNonRestoredState" for "run-application" together did the trick.

I was a bit confused, because when you run "flink run-application --help",
it only tells you about the "--executor" and "--target" options. So I
assumed I should pass everything else as -D params. I had only tried
passing "--allowNonRestoredState" on the CLI as the last resort but didn't
think to do it together with "--fromSavepoint".

Thanks again!

On Sun, Feb 20, 2022 at 9:49 PM Yang Wang <danrtsey...@gmail.com> wrote:

> By design, we should support arbitrary config keys via the CLI when using
> generic CLI mode.
>
> Do you have also specified the "--fromSavepoint" along with
> "--allowNonRestoredState" when submitting a Flink job via "flink
> run-application"?
>
> From the current code base, it seems that the CLI options(e.g
> --fromSavepoint, --allowNonRestoredState) have higher priority than Flink
> config options.
> And it will make the savepoint related config options are overwritten
> wrongly. Refer to the implementation[1].
>
> [1].
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181
>
>
> Best,
> Yang
>
> Andrey Bulgakov <m...@andreiko.ru> 于2022年2月19日周六 08:30写道:
>
>> Hi Austin,
>>
>> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>>
>> But for what it's worth, I'm setting a few options unrelated to
>> kubernetes this way and they all have effect:
>>     -Dstate.checkpoints.num-retained=100 \
>>
>> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>> \
>>     -Dio.tmp.dirs=/data/flink-local-data \
>>     -Dqueryable-state.enable=true \
>>
>> The only one i'm having problems with is
>> "execution.savepoint.ignore-unclaimed-state".
>>
>> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Andrey,
>>>
>>> It's unclear to me from the docs[1] if the flink native-kubernetes
>>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
>>> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>>>
>>> Best,
>>> Austin
>>>
>>> [1]:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>>>
>>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru>
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I'm working on migrating our Flink job away from Hadoop session mode to
>>>> K8S application mode.
>>>> It's been going great so far but I'm hitting a wall with this seemingly
>>>> simple thing.
>>>>
>>>> In the first phase of the migration I want to remove some operators
>>>> (their state can be discarded) and focus on getting the primary pipeline
>>>> running first.
>>>> For that I have to start the cluster from a savepoint with the
>>>> "allowNonRestoredState" parameter turned on.
>>>>
>>>> The problem is that I can't set it in any way that I'm aware of. I
>>>> tried 4 ways separately and simultaneously:
>>>>
>>>> 1) Adding --allowNonRestoredState to flink run-application
>>>> -t kubernetes-application
>>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>>>> run-application -t kubernetes-application
>>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my
>>>> local flink-conf.yaml where I'm running flink run-application
>>>> 4) Overriding it in the application code:
>>>>     val sigh = new Configuration()
>>>>     
>>>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>>>> true)
>>>>     env.configure(sigh)
>>>>
>>>> Every time the resulting pod ends up with "false" value for this
>>>> setting in its configmap:
>>>> $ kc describe cm/flink-config-flink-test | grep ignore
>>>> execution.savepoint.ignore-unclaimed-state: false
>>>>
>>>> And I get the exception:
>>>> java.lang.IllegalStateException: Failed to rollback to
>>>> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for
>>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>>>> operator is not available in the new program. If you want to allow to skip
>>>> this, you can set the --allowNonRestoredState option on the CLI.
>>>>
>>>> It seems like something overrides it to false and it never has any
>>>> effect.
>>>>
>>>> Can this be a bug or am I doing something wrong?
>>>>
>>>> For context, the savepoint is produced by Flink 1.8.2 and the version
>>>> I'm trying to run on K8S is 1.14.3.
>>>>
>>>> --
>>>> With regards,
>>>> Andrey Bulgakov
>>>>
>>>>
>>
>> --
>> With regards,
>> Andrey Bulgakov
>>
>

-- 
With regards,
Andrey Bulgakov

Reply via email to