The job fails when starting because its arguments are passed through the Flink configuration in application deployment mode.
>This is a known limit of the current Flink options parser. Refer to FLINK-15358[1] for more information. Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource: https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189 It's indeed a long-standing issue. We could easily replace the parsing logic with a standard YAML parser, we even have Jackson with YAML support built into flink-core. However, I think we worry that this might be breaking some applications which rely on the lenient behavior of the existing parser. -Max On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao <xiangcaohe...@gmail.com> wrote: > Hi Yang, > > Do you think flink-conf not supporting `#` in FLINK-15358[1] and Flink > job spec not supporting `#` are caused by some common code? or maybe they > are in different code paths? My first guess was they are in different > code paths. The flink-conf is parsed when starting the flink cluster while > job spec is parsed when starting the job application. > > On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao <xiangcaohe...@gmail.com> > wrote: > >> Hi Gyula, >> >> Thanks for getting back. Could you share how to submit job to >> flinkk8operator in json format? >> >> We use the java Fabric8 K8 client, which serializes java FlinkDeployment >> objects >> to CustomResource YAML (see the code snippet below). Since `#` is >> considered a special character denoting comments in YAML, it should be >> escaped properly when YAML file is generated. We are also reading into the >> code to see if we can identify the place for the fix. >> >> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; >> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList; >> import >> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; >> >> FlinkDeployment deployment = xxxx; >> CustomResourceDefinitionContext context = xxx; >> DefaultKubernetesClient client = xxx; >> >> client >> .customResources( >> context, FlinkDeployment.class, FlinkDeploymentList.class) >> .inNamespace(xxx) >> .withName(deploymentName) >> .createOrReplace(deployment); >> >> >> >> >> >> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <danrtsey...@gmail.com> wrote: >> >>> This is a known limit of the current Flink options parser. Refer to >>> FLINK-15358[1] for more information. >>> >>> [1]. https://issues.apache.org/jira/browse/FLINK-15358 >>> >>> Best, >>> Yang >>> >>> Gyula Fóra <gyula.f...@gmail.com> 于2022年11月8日周二 14:41写道: >>> >>>> It is also possible that this is a problem of the Flink native >>>> Kubernetes integration, we have to check where exactly it goes wrong before >>>> we try to fix it . >>>> >>>> We simply set the args into a Flink config and pass it to the native >>>> deployment logic in the operator. >>>> >>>> Gyula >>>> >>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>> >>>>> Hi! >>>>> >>>>> How do you submit your yaml? >>>>> >>>>> It’s possible that this is not operator problem. Did you try >>>>> submitting the deployment in json format instead? >>>>> >>>>> If it still doesn't work please open a JIRA ticket with the details to >>>>> reproduce and what you have tried :) >>>>> >>>>> Cheers >>>>> Gyula >>>>> >>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xiangcaohe...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We have a job that contains `#` as part of mainArgs and it used to >>>>>> work on Ververica. Now we are switching to our own control plane to >>>>>> deploy >>>>>> to flink-operaotor and the job started to fail due to the main args >>>>>> string >>>>>> getting truncated at `#` character when passed to flink application. I >>>>>> believe this is due to characters after `#` being interpreted as comments >>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator >>>>>> needs to escape `#` when generating k8 yaml file. >>>>>> >>>>>> Assuming the mainArgs contain '\"xyz#abc\". >>>>>> >>>>>> Here is the stack-trace: >>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could >>>>>> not parse value '\"xyz' *(Note: truncated by #)* >>>>>> >>>>>> for key '$internal.application.program-args'.\n\tat >>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat >>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat >>>>>> >>>>>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat >>>>>> >>>>>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat >>>>>> >>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat >>>>>> >>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused >>>>>> by: *java.lang.IllegalArgumentException: Could not split string. >>>>>> Quoting was not closed properly*.\n\tat >>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat >>>>>> >>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat >>>>>> >>>>>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat >>>>>> >>>>>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat >>>>>> >>>>>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat >>>>>> java.base/java.util.Optional.map(Optional.java:265)\n\tat >>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t... >>>>>> 5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not >>>>>> create application >>>>>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"} >>>>>> >>>>>> >>>>>> Can someone take a look and help fixing this issue? or I can help >>>>>> fixing this if someone can point me in the right direction. >>>>>> >>>>>> -- >>>>>> Best Wishes & Regards >>>>>> Shawn Xiangcao Liu >>>>>> >>>>> >> >> -- >> Best Wishes & Regards >> Shawn Xiangcao Liu >> > > > -- > Best Wishes & Regards > Shawn Xiangcao Liu >