The job fails when starting because its arguments are passed through the
Flink configuration in application deployment mode.

>This is a known limit of the current Flink options parser. Refer to
FLINK-15358[1] for more information.

Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource:
https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189

It's indeed a long-standing issue. We could easily replace the parsing
logic with a standard YAML parser, we even have Jackson with YAML support
built into flink-core. However, I think we worry that this might be
breaking some applications which rely on the lenient behavior of the
existing parser.

-Max

On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao <xiangcaohe...@gmail.com> wrote:

> Hi Yang,
>
> Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink
> job spec not supporting `#` are caused by some common code?   or maybe they
> are in different code paths?  My first guess was they are in different
> code paths. The flink-conf is parsed when starting the flink cluster while
> job spec is parsed when starting the job application.
>
> On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao <xiangcaohe...@gmail.com>
> wrote:
>
>> Hi Gyula,
>>
>> Thanks for getting back. Could you share how to submit job to
>> flinkk8operator in json format?
>>
>> We use the java Fabric8 K8 client, which serializes java FlinkDeployment 
>> objects
>> to CustomResource YAML (see the code snippet below).  Since `#` is
>> considered a special character denoting comments in YAML,  it should be
>> escaped properly when YAML file is generated. We are also reading into the
>> code to see if we can identify the place for the fix.
>>
>> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
>> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
>> import
>> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>>
>> FlinkDeployment deployment = xxxx;
>> CustomResourceDefinitionContext context = xxx;
>> DefaultKubernetesClient client = xxx;
>>
>>     client
>>           .customResources(
>>               context, FlinkDeployment.class, FlinkDeploymentList.class)
>>           .inNamespace(xxx)
>>           .withName(deploymentName)
>>           .createOrReplace(deployment);
>>
>>
>>
>>
>>
>> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>
>>> This is a known limit of the current Flink options parser. Refer to
>>> FLINK-15358[1] for more information.
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>>
>>> Best,
>>> Yang
>>>
>>> Gyula Fóra <gyula.f...@gmail.com> 于2022年11月8日周二 14:41写道:
>>>
>>>> It is also possible that this is a problem of the Flink native
>>>> Kubernetes integration, we have to check where exactly it goes wrong before
>>>> we try to fix it .
>>>>
>>>> We simply set the args into a Flink config and pass it to the native
>>>> deployment logic in the operator.
>>>>
>>>> Gyula
>>>>
>>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> How do you submit your yaml?
>>>>>
>>>>> It’s possible that this is not operator problem. Did you try
>>>>> submitting the deployment in json format instead?
>>>>>
>>>>> If it still doesn't work please open a JIRA ticket with the details to
>>>>> reproduce and what you have tried :)
>>>>>
>>>>> Cheers
>>>>> Gyula
>>>>>
>>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xiangcaohe...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>>> work on Ververica. Now we are switching to our own control plane to 
>>>>>> deploy
>>>>>> to flink-operaotor and the job started to fail due to the main args 
>>>>>> string
>>>>>> getting truncated at `#` character when passed to flink application. I
>>>>>> believe this is due to characters after `#` being interpreted as comments
>>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>>
>>>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>>>
>>>>>> Here is the stack-trace:
>>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>>>
>>>>>> for key  '$internal.application.program-args'.\n\tat
>>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>>>>>  
>>>>>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>>>>>  
>>>>>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>>>>>  
>>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>>>>>  
>>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>>>>>>  by: *java.lang.IllegalArgumentException: Could not split string. 
>>>>>> Quoting was not closed properly*.\n\tat 
>>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>>>>>>  
>>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>>>>>>  
>>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>>>>>>  
>>>>>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>>>>>>  
>>>>>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>>>>>>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
>>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>>>>>>  5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not 
>>>>>> create application 
>>>>>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>>>
>>>>>>
>>>>>>  Can someone take a look and help fixing this issue? or I can help
>>>>>> fixing this if someone can point me in the right direction.
>>>>>>
>>>>>> --
>>>>>> Best Wishes & Regards
>>>>>> Shawn Xiangcao Liu
>>>>>>
>>>>>
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>

Reply via email to