[ https://issues.apache.org/jira/browse/FLINK-36457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aleksandr Pilipenko updated FLINK-36457: ---------------------------------------- Description: When using Python DataStream API with standard yaml configuration, calls to {{stream_env.add_jars}} or {{stream_env.add_classpaths}} will result in corrupted exectution config. h3. Example: Attempt to execute code below with {{./bin/flink run --python job.py}} {code:python} from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Types if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar") env.from_collection(["a", "b", "c"], type_info=Types.STRING()) \ .print() env.execute() {code} will fail with following error: {code:java} Traceback (most recent call last): File "/Users/a.pilipenko/test.py", line 12, in <module> env.execute() File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 824, in execute File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : java.net.MalformedURLException: no protocol: ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'] at java.base/java.net.URL.<init>(URL.java:645) at java.base/java.net.URL.<init>(URL.java:541) at java.base/java.net.URL.<init>(URL.java:488) at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) ... 14 more {code} h3. Cause: This issue is caused by [implementation|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/stream_execution_environment.py#L566-L573] of {{stream_execution_environment#add_jars}} and {{stream_execution_environment#add_classpaths}} not being updated to comply with Yaml representation of list values. After calling {{add_jars(...)}} from example above, value in effective configuration will be {code:java} ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'];file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar {code} Since this string is no longer valid list in Yaml, parsing will fall back to parsing in legacy format, splitting string with {{';'}} was: When using Python DataStream API with standard yaml configuration, calls to {{stream_env.add_jars}} or {{stream_env.add_classpaths}} will result in corrupted exectution config. h3. Example: Attempt to execute code below with {{./bin/flink run --python job.py}} {code:python} from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Types if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar") env.from_collection(["a", "b", "c"], type_info=Types.STRING()) \ .print() env.execute() {code} will fail with following error: {code:java} Traceback (most recent call last): File "/Users/a.pilipenko/test.py", line 12, in <module> env.execute() File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 824, in execute File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : java.net.MalformedURLException: no protocol: ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'] at java.base/java.net.URL.<init>(URL.java:645) at java.base/java.net.URL.<init>(URL.java:541) at java.base/java.net.URL.<init>(URL.java:488) at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) ... 14 more {code} h3. Cause: This issue is caused by [implementation|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/stream_execution_environment.py#L566-L573] of {{stream_execution_environment#add_jars}} and {{stream_execution_environment#add_classpaths}} not being updated to comply with Yaml representation of list values. After calling {{add_jars(...)}} from example above, value in effective configuration will be {code:java} ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'];file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar {code} Since this string is no longer valid list in Yaml, parsing will fall back to parsing in legacy format, splitting string with {{';'}} h3. Workaround: A possible workaround is to add jars to the configuration and then pass it to {{stream_execution_environment}} instead of calling {{{}add_jars{}}}. {code:python} jars = ["file:///path/to/file_1.jar", "file:///path/to/file_2.jar"] conf = Configuration() conf.set_string("pipeline.jars", str(jars)) env = StreamExecutionEnvironment.get_execution_environment(conf) {code} > Python DataStream API: calling add_jars or add_classpaths results in > corrupted execution config > ----------------------------------------------------------------------------------------------- > > Key: FLINK-36457 > URL: https://issues.apache.org/jira/browse/FLINK-36457 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 2.0.0, 1.19.0, 1.20.0, 1.19.1 > Reporter: Aleksandr Pilipenko > Priority: Major > > When using Python DataStream API with standard yaml configuration, calls to > {{stream_env.add_jars}} or {{stream_env.add_classpaths}} will result in > corrupted exectution config. > h3. Example: > Attempt to execute code below with {{./bin/flink run --python job.py}} > {code:python} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.common import Types > if __name__ == '__main__': > env = StreamExecutionEnvironment.get_execution_environment() > > env.add_jars("file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar") > > env.from_collection(["a", "b", "c"], type_info=Types.STRING()) \ > .print() > env.execute() > {code} > will fail with following error: > {code:java} > Traceback (most recent call last): > File "/Users/a.pilipenko/test.py", line 12, in <module> > env.execute() > File > "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", > line 824, in execute > File > "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > File > "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 146, in deco > File > "/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", > line 326, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. > : java.net.MalformedURLException: no protocol: > ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'] > at java.base/java.net.URL.<init>(URL.java:645) > at java.base/java.net.URL.<init>(URL.java:541) > at java.base/java.net.URL.<init>(URL.java:488) > at > org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) > at > org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) > at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) > at > org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:829) > org.apache.flink.client.program.ProgramAbortException: > java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) > at > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > ... 14 more > {code} > h3. Cause: > This issue is caused by > [implementation|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/stream_execution_environment.py#L566-L573] > of {{stream_execution_environment#add_jars}} and > {{stream_execution_environment#add_classpaths}} not being updated to comply > with Yaml representation of list values. > After calling {{add_jars(...)}} from example above, value in effective > configuration will be > {code:java} > ['file:/Users/a.pilipenko/Dev/flink-dist/flink-1.20.0/opt/flink-python-1.20.0.jar'];file:///Users/a.pilipenko/Downloads/flink-connector-kafka-3.2.0-1.19.jar > {code} > Since this string is no longer valid list in Yaml, parsing will fall back to > parsing in legacy format, splitting string with {{';'}} -- This message was sent by Atlassian Jira (v8.20.10#820010)