Xin Yu created FLINK-24335: ------------------------------ Summary: Get java.net.MalformedURLException when submit job from python by pyflink table api. Key: FLINK-24335 URL: https://issues.apache.org/jira/browse/FLINK-24335 Project: Flink Issue Type: Bug Components: API / Python, Client / Job Submission, Table SQL / API Affects Versions: 1.11.2 Environment: Liunx, Flink 1.11.2 Reporter: Xin Yu
When I run flink client to submit a python based workflow, I got the MalformedURLException like this: {{Traceback (most recent call last):Traceback (most recent call last): File "/opt/python-occlum/lib/python3.7/site-packages/ai_flow_plugins/job_plugins/flink/flink_run_main.py", line 96, in run_project flink_execute_func(run_graph=run_graph, job_execution_info=job_execution_info, flink_env=flink_env) File "/opt/python-occlum/lib/python3.7/site-packages/ai_flow_plugins/job_plugins/flink/flink_run_main.py", line 75, in flink_execute_func job_client = statement_set.execute().get_job_client() File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/statement_set.py", line 104, in execute return TableResult(self._j_statement_set.execute()) File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)pyflink.util.exceptions.TableException: 'Failed to execute sql'Traceback (most recent call last): File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while calling o24.execute.: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:721) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.net.MalformedURLException: no protocol: at java.base/java.net.URL.<init>(URL.java:645) at java.base/java.net.URL.<init>(URL.java:541) at java.base/java.net.URL.<init>(URL.java:488) at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:127) at org.apache.flink.client.cli.ExecutionConfigAccessor.getClasspaths(ExecutionConfigAccessor.java:79) at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:62) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:57) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:705) ... 12 more}} After some debug work, I found the problem is related with TableEvneriontment.execute_sql. The root cause is TableEvenriontment._add_jars_to_j_env_config in pyflink/table/TableEnverionment.py. {{if j_configuration.containsKey(config_key):}} {{ for url in j_configuration.getString(config_key, "").split(";"):}} {{ jar_urls_set.add(url)}} In our case, pipeline.classpaths was set by empty list value from FromProgramOption, so the upper code block will introduce a empty string ("") into pipeline.classpaths, for example "a.jar;b.jar;;c.jar", and it will cause the according exception. Another problem, the order of string set in python is not determinate, so ";".join(jar_urls_set) does NOT keep the classpaths order. The list is more suiteable in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)