William Que created FLINK-36766:
-----------------------------------

             Summary: Use pyflink to create remote env
                 Key: FLINK-36766
                 URL: https://issues.apache.org/jira/browse/FLINK-36766
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.19.1, 1.20.0
         Environment: Ubuntu 24 LTSC

Flink : 1.19.1  or 1.20.0

 
            Reporter: William Que


I use the following codes to connect remote flink cluster and  then create a 
remote flink env. After adding jar files to the streamExecutionEnvironment, 
evary time when executing flink sql, error will be reported, something like 
error of parsing yaml file.
{code:java}
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from pyflink.table import StreamTableEnvironment

gateway = get_gateway()
string_class = gateway.jvm.String
string_array = gateway.new_array(string_class, 0)
stream_env = 
gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
j_stream_exection_environment = 
stream_env.createRemoteEnvironment("master",8081,string_array)
env = StreamExecutionEnvironment(j_stream_exection_environment) 

jars_path = "F:/jars/flink-1.19.1/"
jar_files = ["file:///" + jars_path + f for f in os.listdir(jars_path) if 
f.endswith('.jar')]
jar_files_str = ';'.join(jar_files)
env.add_jars(*jar_files)   ## Cause Error

t_env = StreamTableEnvironment.create(env)
{code}
 

Then I trace the error, and find it caused by a static method in 
configuration.py of pyflink package. after env.add_jars(*jar_files) , the value 
parameter will be like this, which caused the above error.

value = 
'{color:#FF0000}[];{color}file:///F:/software/jars-flink3/flink-clients-1.20.0.jar;file:///F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar;......'
{code:java}
    @staticmethod
    def parse_jars_value(value: str, jvm):
        is_standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
        if is_standard_yaml:
            from ruamel.yaml import YAML
            yaml = YAML(typ='safe')
            jar_urls_list = yaml.load(value)  # ERROR 
            if isinstance(jar_urls_list, list):
                return jar_urls_list
        return value.split(";") {code}
 

I once tried to fix it by the way of removing "[];" part from the value of 
value parameter, one problem solved but another then came out, it seems jar 
files have been added twice in classpath at some place. 
{code:java}
Caused by: java.lang.IllegalStateException: The library registration references 
a different set of library BLOBs than previous registrations for this job:
old:[file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, 
file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, 
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, 
file:/F:/software/jars-flink3/flink-json-1.20.0.jar, 
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, 
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar]
new:[file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, 
file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, 
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, 
file:/F:/software/jars-flink3/flink-json-1.20.0.jar, 
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, 
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, 
file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, 
file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, 
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, 
file:/F:/software/jars-flink3/flink-json-1.20.0.jar, 
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, 
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar]{code}
 

Please check it carefullly.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to