Xin Yu created FLINK-24335:
------------------------------

             Summary: Get java.net.MalformedURLException when submit job from 
python by pyflink table api.
                 Key: FLINK-24335
                 URL: https://issues.apache.org/jira/browse/FLINK-24335
             Project: Flink
          Issue Type: Bug
          Components: API / Python, Client / Job Submission, Table SQL / API
    Affects Versions: 1.11.2
         Environment: Liunx, Flink 1.11.2
            Reporter: Xin Yu


When I run flink client to submit a python based workflow, I got the 
MalformedURLException like this:

{{Traceback (most recent call last):Traceback (most recent call last):  File 
"/opt/python-occlum/lib/python3.7/site-packages/ai_flow_plugins/job_plugins/flink/flink_run_main.py",
 line 96, in run_project    flink_execute_func(run_graph=run_graph, 
job_execution_info=job_execution_info, flink_env=flink_env)  File 
"/opt/python-occlum/lib/python3.7/site-packages/ai_flow_plugins/job_plugins/flink/flink_run_main.py",
 line 75, in flink_execute_func    job_client = 
statement_set.execute().get_job_client()  File 
"/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/statement_set.py", line 
104, in execute    return TableResult(self._j_statement_set.execute())  File 
"/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 
1286, in __call__    answer, self.gateway_client, self.target_id, self.name)  
File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
line 154, in deco    raise exception_mapping[exception](s.split(': ', 1)[1], 
stack_trace)pyflink.util.exceptions.TableException: 'Failed to execute 
sql'Traceback (most recent call last):  File 
"/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
147, in deco  File 
"/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 
328, in get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while 
calling o24.execute.: org.apache.flink.table.api.TableException: Failed to 
execute sql at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:721)
 at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:829)Caused by: 
java.net.MalformedURLException: no protocol: at 
java.base/java.net.URL.<init>(URL.java:645) at 
java.base/java.net.URL.<init>(URL.java:541) at 
java.base/java.net.URL.<init>(URL.java:488) at 
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:127)
 at 
org.apache.flink.client.cli.ExecutionConfigAccessor.getClasspaths(ExecutionConfigAccessor.java:79)
 at 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:62)
 at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:57)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810)
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
 at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:705)
 ... 12 more}}
After some debug work, I found the problem is related with 
TableEvneriontment.execute_sql. The root cause is  
TableEvenriontment._add_jars_to_j_env_config in 
pyflink/table/TableEnverionment.py.

{{if j_configuration.containsKey(config_key):}}
{{  for url in j_configuration.getString(config_key, "").split(";"):}}
{{    jar_urls_set.add(url)}}

In our case, pipeline.classpaths was set by empty list value
from FromProgramOption, so the upper code block will
introduce a empty string ("") into pipeline.classpaths, for example
"a.jar;b.jar;;c.jar", and it will cause the according exception.


Another problem, the order of string set in python is not
determinate, so ";".join(jar_urls_set) does NOT keep the
classpaths order. The list is more suiteable in this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to