The config options configured by -D param should take effect. It is also
the recommended way instead of CLI options(e.g. --fromSavepoint).
Not only the K8s application, it also does not work for yarn application
and yarn per-job mode.
I believe it is indeed a bug in the current implementation and have created
a ticket for this[1].

After then you could start the Flink k8s application via the following
command.






*$FLINK_HOME/bin/flink run-application -t kubernetes-application
\-Dkubernetes.cluster-id=$CLUSTER_ID \-Dkubernetes.namespace=$NAMESPACE
\-Dkubernetes.container.image=$IMAGE
\-Dexecution.savepoint.ignore-unclaimed-state=true
-Dexecution.savepoint.path=oss://flink-debug-yiqi/flink-ha
\local:///opt/flink/examples/streaming/StateMachineExample.jar*


If you still want to use the CLI options, then I expect at least you need
to set "--fromSavepoint".

[1]. https://issues.apache.org/jira/browse/FLINK-26316


Best,
Yang

Andrey Bulgakov <m...@andreiko.ru> 于2022年2月23日周三 04:09写道:

> Thank you, Yang. That was it! Specifying "--fromSavepoint" and
> "--allowNonRestoredState" for "run-application" together did the trick.
>
> I was a bit confused, because when you run "flink run-application --help",
> it only tells you about the "--executor" and "--target" options. So I
> assumed I should pass everything else as -D params. I had only tried
> passing "--allowNonRestoredState" on the CLI as the last resort but didn't
> think to do it together with "--fromSavepoint".
>
> Thanks again!
>
> On Sun, Feb 20, 2022 at 9:49 PM Yang Wang <danrtsey...@gmail.com> wrote:
>
>> By design, we should support arbitrary config keys via the CLI when using
>> generic CLI mode.
>>
>> Do you have also specified the "--fromSavepoint" along with
>> "--allowNonRestoredState" when submitting a Flink job via "flink
>> run-application"?
>>
>> From the current code base, it seems that the CLI options(e.g
>> --fromSavepoint, --allowNonRestoredState) have higher priority than Flink
>> config options.
>> And it will make the savepoint related config options are overwritten
>> wrongly. Refer to the implementation[1].
>>
>> [1].
>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181
>>
>>
>> Best,
>> Yang
>>
>> Andrey Bulgakov <m...@andreiko.ru> 于2022年2月19日周六 08:30写道:
>>
>>> Hi Austin,
>>>
>>> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>>>
>>> But for what it's worth, I'm setting a few options unrelated to
>>> kubernetes this way and they all have effect:
>>>     -Dstate.checkpoints.num-retained=100 \
>>>
>>> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>>> \
>>>     -Dio.tmp.dirs=/data/flink-local-data \
>>>     -Dqueryable-state.enable=true \
>>>
>>> The only one i'm having problems with is
>>> "execution.savepoint.ignore-unclaimed-state".
>>>
>>> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hi Andrey,
>>>>
>>>> It's unclear to me from the docs[1] if the flink native-kubernetes
>>>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
>>>> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>>>>
>>>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru>
>>>> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> I'm working on migrating our Flink job away from Hadoop session mode
>>>>> to K8S application mode.
>>>>> It's been going great so far but I'm hitting a wall with this
>>>>> seemingly simple thing.
>>>>>
>>>>> In the first phase of the migration I want to remove some operators
>>>>> (their state can be discarded) and focus on getting the primary pipeline
>>>>> running first.
>>>>> For that I have to start the cluster from a savepoint with the
>>>>> "allowNonRestoredState" parameter turned on.
>>>>>
>>>>> The problem is that I can't set it in any way that I'm aware of. I
>>>>> tried 4 ways separately and simultaneously:
>>>>>
>>>>> 1) Adding --allowNonRestoredState to flink run-application
>>>>> -t kubernetes-application
>>>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>>>>> run-application -t kubernetes-application
>>>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my
>>>>> local flink-conf.yaml where I'm running flink run-application
>>>>> 4) Overriding it in the application code:
>>>>>     val sigh = new Configuration()
>>>>>     
>>>>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>>>>> true)
>>>>>     env.configure(sigh)
>>>>>
>>>>> Every time the resulting pod ends up with "false" value for this
>>>>> setting in its configmap:
>>>>> $ kc describe cm/flink-config-flink-test | grep ignore
>>>>> execution.savepoint.ignore-unclaimed-state: false
>>>>>
>>>>> And I get the exception:
>>>>> java.lang.IllegalStateException: Failed to rollback to
>>>>> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for
>>>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>>>>> operator is not available in the new program. If you want to allow to skip
>>>>> this, you can set the --allowNonRestoredState option on the CLI.
>>>>>
>>>>> It seems like something overrides it to false and it never has any
>>>>> effect.
>>>>>
>>>>> Can this be a bug or am I doing something wrong?
>>>>>
>>>>> For context, the savepoint is produced by Flink 1.8.2 and the version
>>>>> I'm trying to run on K8S is 1.14.3.
>>>>>
>>>>> --
>>>>> With regards,
>>>>> Andrey Bulgakov
>>>>>
>>>>>
>>>
>>> --
>>> With regards,
>>> Andrey Bulgakov
>>>
>>
>
> --
> With regards,
> Andrey Bulgakov
>

Reply via email to