Hello, I've been running into a wall for the past few days trying to get my PyFlink cluster running in a Docker container on my local machine.
However, whenever I get to the point in my code where I need to run env.execute(), I see the following error (full stack trace at bottom of email): ``` java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-1.19.2.jar'] ``` It appears that the `file://` prefix is being shortened to `file:`, but I'm not setting this anywhere in my code, leading me to believe there's underlying behavior that's creating this inaccurate file path. If this error is familiar, I would appreciate any advice as to how to remediate the issue. I've attached code snippets to this file of my current implementation. I've tried using both versions 1.19.2 and 1.20.1. Thank you for any assistance you may be able to provide. CODE SNIPPET: ``` def parse_record(record: str) -> dict: return json.loads(record) def initialize_environment() -> StreamExecutionEnvironment: """Initialize the Flink environment""" env = StreamExecutionEnvironment.get_execution_environment() flink_kafka_connector_path: str = str( (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve() ) flink_python_path: str = str( (JAR_DIR.parent / "opt" / "flink-python-1.19.2.jar").resolve() ) print(f"flink_kafka_connector_path: {flink_kafka_connector_path}") print(f"flink_python_path: {flink_python_path}") env.add_jars(f"file://{flink_kafka_connector_path}", f"file://{flink_python_path}") return env def main() -> None: env: StreamExecutionEnvironment = initialize_environment() kafka_properties = { "bootstrap.servers": BROKERS, "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.jaas.config": f"org.apache.kafka.common.security.plain.PlainLoginModule required username='{USERNAME}' password='{PASSWORD}';", } kafka_source: KafkaSource = ( KafkaSource.builder() .set_topics(SOURCE_TOPIC) .set_properties(kafka_properties) .set_starting_offsets(KafkaOffsetsInitializer.earliest()) .set_value_only_deserializer(SimpleStringSchema()) .build() ) kafka_sink: KafkaSink = ( KafkaSink.builder() .set_bootstrap_servers(BROKERS) .set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic(DESTINATION_TOPIC) .set_value_serialization_schema(SimpleStringSchema()) .build() ) .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) data_stream = env.from_source( kafka_source, WatermarkStrategy.no_watermarks(), "Kafka Source", ) transformed_data = data_stream.map(parse_record, output_type=Types.STRING()) transformed_data.sink_to(kafka_sink) print("START EXECUTING!") env.execute() if __name__ == "__main__": main() ``` DOCKERFILE: ``` ############################################################################## # Stage 1 - Build uber jar using Maven ############################################################################## # Set arguments for image versions ARG JAVA_VERSION=11 ARG FLINK_VERSION=1.19 ARG SCALA_VERSION=scala_2.12 # Source build image FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build # Build jars WORKDIR /build COPY pom.xml . RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars ############################################################################## # Stage 2 - Create the final image ############################################################################## # Source image FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION} # Install Python RUN apt-get update && apt-get install -y python3.11 python3-pip # Copy the built jars from the Maven stage COPY --from=build /build/jars /opt/flink/lib # Create symlink so that "python" points to "python3.11" RUN ln -s /usr/bin/python3.11 /usr/bin/python # Set the working directory WORKDIR /opt/flink/src # Copy the Python project files COPY pyproject.toml . COPY README.md . COPY dataproducts /opt/flink/src/dataproducts # Go get a cup of coffee, this will take a while ☕ RUN python3.11 -m pip install . \ --no-cache-dir ``` STACK TRACE: ``` root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports flink_kafka_connector_path: /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar START EXECUTING! Traceback (most recent call last): File "/opt/flink/src/dataproducts/main.py", line 97, in <module> main() File "/opt/flink/src/dataproducts/main.py", line 93, in main env.execute() File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 813, in execute File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-1.19.2.jar'] at java.base/java.net.URL.<init>(Unknown Source) at java.base/java.net.URL.<init>(Unknown Source) at java.base/java.net.URL.<init>(Unknown Source) at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) ... 14 more ```