William Que created FLINK-36766: ----------------------------------- Summary: Use pyflink to create remote env Key: FLINK-36766 URL: https://issues.apache.org/jira/browse/FLINK-36766 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.19.1, 1.20.0 Environment: Ubuntu 24 LTSC
Flink : 1.19.1 or 1.20.0 Reporter: William Que I use the following codes to connect remote flink cluster and then create a remote flink env. After adding jar files to the streamExecutionEnvironment, evary time when executing flink sql, error will be reported, something like error of parsing yaml file. {code:java} import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway from pyflink.table import StreamTableEnvironment gateway = get_gateway() string_class = gateway.jvm.String string_array = gateway.new_array(string_class, 0) stream_env = gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment j_stream_exection_environment = stream_env.createRemoteEnvironment("master",8081,string_array) env = StreamExecutionEnvironment(j_stream_exection_environment) jars_path = "F:/jars/flink-1.19.1/" jar_files = ["file:///" + jars_path + f for f in os.listdir(jars_path) if f.endswith('.jar')] jar_files_str = ';'.join(jar_files) env.add_jars(*jar_files) ## Cause Error t_env = StreamTableEnvironment.create(env) {code} Then I trace the error, and find it caused by a static method in configuration.py of pyflink package. after env.add_jars(*jar_files) , the value parameter will be like this, which caused the above error. value = '{color:#FF0000}[];{color}file:///F:/software/jars-flink3/flink-clients-1.20.0.jar;file:///F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar;......' {code:java} @staticmethod def parse_jars_value(value: str, jvm): is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() if is_standard_yaml: from ruamel.yaml import YAML yaml = YAML(typ='safe') jar_urls_list = yaml.load(value) # ERROR if isinstance(jar_urls_list, list): return jar_urls_list return value.split(";") {code} I once tried to fix it by the way of removing "[];" part from the value of value parameter, one problem solved but another then came out, it seems jar files have been added twice in classpath at some place. {code:java} Caused by: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, file:/F:/software/jars-flink3/flink-json-1.20.0.jar, file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar] new:[file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, file:/F:/software/jars-flink3/flink-json-1.20.0.jar, file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, file:/F:/software/jars-flink3/flink-json-1.20.0.jar, file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar]{code} Please check it carefullly. -- This message was sent by Atlassian Jira (v8.20.10#820010)