Hey Team, I've been struggling a bit to get Pyflink running in a Docker Container.
Here's my python code (I redacted irrelevant code). This file is called processor.py: from __future__ import annotations import sys from typing import List from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from src import constants from src.functions.spike_detector import SpikeDector class FlinkProcessor: def download_jars(self) -> List[str]: ... return jar_file_paths def build_execution_environment(self): jars = self.download_jars() env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.STREAMING) env.set_parallelism(constants.FLINK_PARALLELISM) env.add_jars(*jars) print(f"Created env: {vars(env)}") return env def build_source_connector(self, env: StreamExecutionEnvironment): ... return data_stream def build_sink_connector(self): ... return kafka_sink def process(self): env = self.build_execution_environment() data_stream = self.build_source_connector(env=env) kafka_sink = self.build_sink_connector() data_stream\ .process(SpikeDector())\ .map(lambda msg: msg, Types.STRING())\ .sink_to(kafka_sink)\ .name(f"Price Spikes on {constants.PRICE_TOPIC_NAME}") print("Executing Flink Job..") env.execute("SpikeDetection") if __name__ == "__main__": print(f"Running code using python version: {sys.version}") FlinkProcessor().process() And here is my full Dockerfile config: FROM flink:1.20.0-java17 # Install Python RUN apt-get update && \ apt-get install -y software-properties-common coreutils && \ add-apt-repository ppa:deadsnakes/ppa && \ apt-get update && \ apt-get install -y python3.9 python3.9-venv python3.9-dev # Point python to python3.9 RUN ln -s /usr/bin/python3.9 /usr/bin/python # Install pip RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py RUN python get-pip.py --force-reinstall # Install app dependencies COPY requirements.txt /app/requirements.txt RUN python -m pip install -r /app/requirements.txt ARG FLINK_VERSION=1.20.0 ARG FLINK_SHORT_VERSION=1.20 ARG KAFKA_VERSION=3.4.0 # Download jar dependencies from Maven Repository into /tmp ADD https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base/$FLINK_VERSION/flink-connector-base-$FLINK_VERSION.jar /tmp/flink-connector-base.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/$KAFKA_VERSION-$FLINK_SHORT_VERSION/flink-connector-kafka-$KAFKA_VERSION-$FLINK_SHORT_VERSION.jar /tmp/flink-connector-kafka.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-java/$FLINK_VERSION/flink-java-$FLINK_VERSION.jar /tmp/flink-java.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/$KAFKA_VERSION-$FLINK_SHORT_VERSION/flink-sql-connector-kafka-$KAFKA_VERSION-$FLINK_SHORT_VERSION.jar /tmp/flink-sql-connector-kafka.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-streaming-java/$FLINK_VERSION/flink-streaming-java-$FLINK_VERSION.jar /tmp/flink-streaming-java.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java/$FLINK_VERSION/flink-table-api-java-$FLINK_VERSION.jar /tmp/flink-table-api-java.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java-bridge/$FLINK_VERSION/flink-table-api-java-bridge-$FLINK_VERSION.jar /tmp/flink-table-api-java-bridge.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-python/$FLINK_VERSION/flink-python-$FLINK_VERSION.jar /tmp/flink-python.jar ADD https://repo1.maven.org/maven2/org/apache/flink/flink-core/$FLINK_VERSION/flink-core-$FLINK_VERSION.jar /tmp/flink-core.jar ADD https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.13/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.jar /tmp/kafka.jar ADD https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/$KAFKA_VERSION/kafka-clients-$KAFKA_VERSION.jar /tmp/kafka-clients.jar # Allow all access to files in /tmp RUN chmod -R 777 /tmp # Copy application code to container COPY src /app/src COPY scripts /app/scripts WORKDIR /app CMD ["sh", "/app/scripts/start-service.sh"] My Dockerfile calls this start-service.sh script (I'm hard-coding CPU_COUNT to 1 for testing): /opt/flink/bin/start-cluster.sh # CPU_COUNT=$(nproc --all) CPU_COUNT=1 echo "CPU COUNT: $CPU_COUNT" for i in $( seq 1 $CPU_COUNT ) do echo "Starting task manager #$i" /opt/flink/bin/taskmanager.sh start done echo "Started $CPU_COUNT task managers" /opt/flink/bin/flink run --pyModule src.processor --pyFiles src And I'm downloading these requirements: boto3==1.34.48 apache-flink==1.20.0 I'm on a mac running java17, python 3.9.6, and flink 1.20.0. I am able to run my code locally, however, once I put the code in a Docker container, I get a "*java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-1.20.0.jar']*" error. I put the entire stack trace below: Traceback (most recent call last): File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main return _run_code(code, main_globals, None, File "/usr/lib/python3.9/runpy.py", line 87, in _run_code exec(code, run_globals) File "/app/src/processor.py", line 104, in <module> FlinkProcessor().process() File "/app/src/processor.py", line 99, in process env.execute("SpikeDetection") File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 824, in execute File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute. : java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-1.20.0.jar'] at java.base/java.net.URL.<init>(Unknown Source) at java.base/java.net.URL.<init>(Unknown Source) at java.base/java.net.URL.<init>(Unknown Source) at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) ... 14 more I tested this against some of the example files in the pyflink repo, and I got the same error for *pyflink/examples/datastream/connectors/kafka_json_format.py* However, I ran against *pyflink/examples/datastream/process_json_data.py* and that one seemed to work. I'm not sure what is causing the issue, however everything runs fine locally, and I only face this issue once I attempt to run from a Docker container. Any help would be tremendously appreciated!! And sorry for the large message. It's my first time posting for code help online, so I hope I'm not breaking any rules :)