[ 
https://issues.apache.org/jira/browse/FLINK-36457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Pilipenko updated FLINK-36457:
----------------------------------------
    Description: 
When using Python DataStream API with standard yaml configuration, calls to 
{{stream_env.add_jars}} or {{stream_env.add_classpaths}} will result in 
corrupted exectution config.
h3. Example:

Attempt to execute code below with {{./bin/flink run --python job.py}}
{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    
env.add_jars("file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar")
    
    env.from_collection(["a", "b", "c"], type_info=Types.STRING()) \
        .print()

    env.execute()
{code}
will fail with following error:
{code:java}
Traceback (most recent call last):
  File "/Users/a.pilipenko/test.py", line 12, in <module>
    env.execute()
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
 line 824, in execute
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
 line 1322, in __call__
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 146, in deco
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
 line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: java.net.MalformedURLException: no protocol: 
['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar']
        at java.base/java.net.URL.<init>(URL.java:645)
        at java.base/java.net.URL.<init>(URL.java:541)
        at java.base/java.net.URL.<init>(URL.java:488)
        at 
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
        at 
org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
        at 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
        at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472)
        at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192)
        at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)

org.apache.flink.client.program.ProgramAbortException: 
java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more
{code}
h3. Cause:

This issue is caused by 
[implementation|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/stream_execution_environment.py#L566-L573]
 of {{stream_execution_environment#add_jars}} and 
{{stream_execution_environment#add_classpaths}} not being updated to comply 
with Yaml representation of list values.

After calling {{add_jars(...)}} from example above, value in effective 
configuration will be
{code:java}
['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'];file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar
{code}
Since this string is no longer valid list in Yaml, parsing will fall back to 
parsing in legacy format, splitting string with {{';'}}

  was:
When using Python DataStream API with standard yaml configuration, calls to 
{{stream_env.add_jars}} or {{stream_env.add_classpaths}} will result in 
corrupted exectution config.
h3. Example:

Attempt to execute code below with {{./bin/flink run --python job.py}}
{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    
env.add_jars("file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar")
    
    env.from_collection(["a", "b", "c"], type_info=Types.STRING()) \
        .print()

    env.execute()
{code}
will fail with following error:
{code:java}
Traceback (most recent call last):
  File "/Users/a.pilipenko/test.py", line 12, in <module>
    env.execute()
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
 line 824, in execute
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
 line 1322, in __call__
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 146, in deco
  File 
"/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
 line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: java.net.MalformedURLException: no protocol: 
['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar']
        at java.base/java.net.URL.<init>(URL.java:645)
        at java.base/java.net.URL.<init>(URL.java:541)
        at java.base/java.net.URL.<init>(URL.java:488)
        at 
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
        at 
org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
        at 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
        at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472)
        at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192)
        at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)

org.apache.flink.client.program.ProgramAbortException: 
java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more
{code}
h3. Cause:

This issue is caused by 
[implementation|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/stream_execution_environment.py#L566-L573]
 of {{stream_execution_environment#add_jars}} and 
{{stream_execution_environment#add_classpaths}} not being updated to comply 
with Yaml representation of list values.

After calling {{add_jars(...)}} from example above, value in effective 
configuration will be
{code:java}
['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'];file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar
{code}
Since this string is no longer valid list in Yaml, parsing will fall back to 
parsing in legacy format, splitting string with {{';'}}
h3. Workaround:

A possible workaround is to add jars to the configuration and then pass it to 
{{stream_execution_environment}} instead of calling {{{}add_jars{}}}.
{code:python}
    jars = ["file:///path/to/file_1.jar", "file:///path/to/file_2.jar"]
    conf = Configuration()
    
    conf.set_string("pipeline.jars", str(jars))
    
    env = StreamExecutionEnvironment.get_execution_environment(conf)
{code}


> Python DataStream API: calling add_jars or add_classpaths results in 
> corrupted execution config
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36457
>                 URL: https://issues.apache.org/jira/browse/FLINK-36457
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 2.0.0, 1.19.0, 1.20.0, 1.19.1
>            Reporter: Aleksandr Pilipenko
>            Priority: Major
>
> When using Python DataStream API with standard yaml configuration, calls to 
> {{stream_env.add_jars}} or {{stream_env.add_classpaths}} will result in 
> corrupted exectution config.
> h3. Example:
> Attempt to execute code below with {{./bin/flink run --python job.py}}
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types
> if __name__ == '__main__':
>     env = StreamExecutionEnvironment.get_execution_environment()
>     
> env.add_jars("file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar")
>     
>     env.from_collection(["a", "b", "c"], type_info=Types.STRING()) \
>         .print()
>     env.execute()
> {code}
> will fail with following error:
> {code:java}
> Traceback (most recent call last):
>   File "/Users/a.pilipenko/test.py", line 12, in <module>
>     env.execute()
>   File 
> "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
>  line 824, in execute
>   File 
> "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
>  line 1322, in __call__
>   File 
> "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>  line 146, in deco
>   File 
> "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
>  line 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
> : java.net.MalformedURLException: no protocol: 
> ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar']
>       at java.base/java.net.URL.<init>(URL.java:645)
>       at java.base/java.net.URL.<init>(URL.java:541)
>       at java.base/java.net.URL.<init>(URL.java:488)
>       at 
> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
>       at 
> org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
>       at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
>       at 
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472)
>       at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192)
>       at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>       at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> org.apache.flink.client.program.ProgramAbortException: 
> java.lang.RuntimeException: Python process exits with code: 1
>       at 
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
>       at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>       at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
>       at 
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
>       ... 14 more
> {code}
> h3. Cause:
> This issue is caused by 
> [implementation|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/stream_execution_environment.py#L566-L573]
>  of {{stream_execution_environment#add_jars}} and 
> {{stream_execution_environment#add_classpaths}} not being updated to comply 
> with Yaml representation of list values.
> After calling {{add_jars(...)}} from example above, value in effective 
> configuration will be
> {code:java}
> ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'];file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar
> {code}
> Since this string is no longer valid list in Yaml, parsing will fall back to 
> parsing in legacy format, splitting string with {{';'}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to