Taking a step back here: I think this needs to be handled in the
application mode in any case. Even if we had a better parser, it would
still treat # as a comment char. The application mode needs to be fixed to
come up with an escape scheme. YAML supports this via \# but that won't
work with our parser. So it needs to be something else. In the meantime, we
could at least add support for escapes in the configuration parser.

CC dev mailing list

-Max

On Tue, Nov 8, 2022 at 2:26 PM Maximilian Michels <m...@apache.org> wrote:

> The job fails when starting because its arguments are passed through the
> Flink configuration in application deployment mode.
>
> >This is a known limit of the current Flink options parser. Refer to
> FLINK-15358[1] for more information.
>
> Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource:
> https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189
>
> It's indeed a long-standing issue. We could easily replace the parsing
> logic with a standard YAML parser, we even have Jackson with YAML support
> built into flink-core. However, I think we worry that this might be
> breaking some applications which rely on the lenient behavior of the
> existing parser.
>
> -Max
>
> On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao <xiangcaohe...@gmail.com>
> wrote:
>
>> Hi Yang,
>>
>> Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink
>> job spec not supporting `#` are caused by some common code?   or maybe they
>> are in different code paths?  My first guess was they are in different
>> code paths. The flink-conf is parsed when starting the flink cluster while
>> job spec is parsed when starting the job application.
>>
>> On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao <xiangcaohe...@gmail.com>
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Thanks for getting back. Could you share how to submit job to
>>> flinkk8operator in json format?
>>>
>>> We use the java Fabric8 K8 client, which serializes java
>>> FlinkDeployment objects to CustomResource YAML (see the code snippet
>>> below).  Since `#` is considered a special character denoting comments in
>>> YAML,  it should be escaped properly when YAML file is generated. We are
>>> also reading into the code to see if we can identify the place for the fix.
>>>
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
>>> import
>>> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>>>
>>> FlinkDeployment deployment = xxxx;
>>> CustomResourceDefinitionContext context = xxx;
>>> DefaultKubernetesClient client = xxx;
>>>
>>>     client
>>>           .customResources(
>>>               context, FlinkDeployment.class, FlinkDeploymentList.class)
>>>           .inNamespace(xxx)
>>>           .withName(deploymentName)
>>>           .createOrReplace(deployment);
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>>
>>>> This is a known limit of the current Flink options parser. Refer to
>>>> FLINK-15358[1] for more information.
>>>>
>>>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Gyula Fóra <gyula.f...@gmail.com> 于2022年11月8日周二 14:41写道:
>>>>
>>>>> It is also possible that this is a problem of the Flink native
>>>>> Kubernetes integration, we have to check where exactly it goes wrong 
>>>>> before
>>>>> we try to fix it .
>>>>>
>>>>> We simply set the args into a Flink config and pass it to the native
>>>>> deployment logic in the operator.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> How do you submit your yaml?
>>>>>>
>>>>>> It’s possible that this is not operator problem. Did you try
>>>>>> submitting the deployment in json format instead?
>>>>>>
>>>>>> If it still doesn't work please open a JIRA ticket with the details
>>>>>> to reproduce and what you have tried :)
>>>>>>
>>>>>> Cheers
>>>>>> Gyula
>>>>>>
>>>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xiangcaohe...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>>>> work on Ververica. Now we are switching to our own control plane to 
>>>>>>> deploy
>>>>>>> to flink-operaotor and the job started to fail due to the main args 
>>>>>>> string
>>>>>>> getting truncated at `#` character when passed to flink application. I
>>>>>>> believe this is due to characters after `#` being interpreted as 
>>>>>>> comments
>>>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>>>
>>>>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>>>>
>>>>>>> Here is the stack-trace:
>>>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>>>>
>>>>>>> for key  '$internal.application.program-args'.\n\tat
>>>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>>>>>>>  by: *java.lang.IllegalArgumentException: Could not split string. 
>>>>>>> Quoting was not closed properly*.\n\tat 
>>>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>>>>>>>  
>>>>>>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>>>>>>>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
>>>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>>>>>>>  5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not 
>>>>>>> create application 
>>>>>>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>>>>
>>>>>>>
>>>>>>>  Can someone take a look and help fixing this issue? or I can help
>>>>>>> fixing this if someone can point me in the right direction.
>>>>>>>
>>>>>>> --
>>>>>>> Best Wishes & Regards
>>>>>>> Shawn Xiangcao Liu
>>>>>>>
>>>>>>
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>

Reply via email to