Hey Team,

I've been struggling a bit to get Pyflink running in a Docker Container.

Here's my python code (I redacted irrelevant code). This file is called
processor.py:
from __future__ import annotations
import sys
from typing import List
from pyflink.datastream import StreamExecutionEnvironment,
RuntimeExecutionMode
from src import constants
from src.functions.spike_detector import SpikeDector


class FlinkProcessor:

def download_jars(self) -> List[str]:
...
return jar_file_paths

def build_execution_environment(self):
jars = self.download_jars()
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(constants.FLINK_PARALLELISM)
env.add_jars(*jars)
print(f"Created env: {vars(env)}")
return env

def build_source_connector(self, env: StreamExecutionEnvironment):
...
return data_stream

def build_sink_connector(self):
...
return kafka_sink

def process(self):
env = self.build_execution_environment()
data_stream = self.build_source_connector(env=env)
kafka_sink = self.build_sink_connector()
data_stream\
.process(SpikeDector())\
.map(lambda msg: msg, Types.STRING())\
.sink_to(kafka_sink)\
.name(f"Price Spikes on {constants.PRICE_TOPIC_NAME}")
print("Executing Flink Job..")
env.execute("SpikeDetection")


if __name__ == "__main__":
print(f"Running code using python version: {sys.version}")
FlinkProcessor().process()



And here is my full Dockerfile config:
FROM flink:1.20.0-java17

# Install Python
RUN apt-get update && \
apt-get install -y software-properties-common coreutils && \
add-apt-repository ppa:deadsnakes/ppa && \
apt-get update && \
apt-get install -y python3.9 python3.9-venv python3.9-dev

# Point python to python3.9
RUN ln -s /usr/bin/python3.9 /usr/bin/python

# Install pip
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
RUN python get-pip.py --force-reinstall

# Install app dependencies
COPY requirements.txt /app/requirements.txt
RUN python -m pip install -r /app/requirements.txt

ARG FLINK_VERSION=1.20.0
ARG FLINK_SHORT_VERSION=1.20
ARG KAFKA_VERSION=3.4.0

# Download jar dependencies from Maven Repository into /tmp
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base/$FLINK_VERSION/flink-connector-base-$FLINK_VERSION.jar
/tmp/flink-connector-base.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/$KAFKA_VERSION-$FLINK_SHORT_VERSION/flink-connector-kafka-$KAFKA_VERSION-$FLINK_SHORT_VERSION.jar
/tmp/flink-connector-kafka.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-java/$FLINK_VERSION/flink-java-$FLINK_VERSION.jar
/tmp/flink-java.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/$KAFKA_VERSION-$FLINK_SHORT_VERSION/flink-sql-connector-kafka-$KAFKA_VERSION-$FLINK_SHORT_VERSION.jar
/tmp/flink-sql-connector-kafka.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-streaming-java/$FLINK_VERSION/flink-streaming-java-$FLINK_VERSION.jar
/tmp/flink-streaming-java.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java/$FLINK_VERSION/flink-table-api-java-$FLINK_VERSION.jar
/tmp/flink-table-api-java.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java-bridge/$FLINK_VERSION/flink-table-api-java-bridge-$FLINK_VERSION.jar
/tmp/flink-table-api-java-bridge.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-python/$FLINK_VERSION/flink-python-$FLINK_VERSION.jar
/tmp/flink-python.jar
ADD
https://repo1.maven.org/maven2/org/apache/flink/flink-core/$FLINK_VERSION/flink-core-$FLINK_VERSION.jar
/tmp/flink-core.jar
ADD
https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.13/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.jar
/tmp/kafka.jar
ADD
https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/$KAFKA_VERSION/kafka-clients-$KAFKA_VERSION.jar
/tmp/kafka-clients.jar

# Allow all access to files in /tmp
RUN chmod -R 777 /tmp

# Copy application code to container
COPY src /app/src
COPY scripts /app/scripts

WORKDIR /app

CMD ["sh", "/app/scripts/start-service.sh"]



My Dockerfile calls this start-service.sh script (I'm hard-coding CPU_COUNT
to 1 for testing):
/opt/flink/bin/start-cluster.sh

# CPU_COUNT=$(nproc --all)
CPU_COUNT=1

echo "CPU COUNT: $CPU_COUNT"

for i in $( seq 1 $CPU_COUNT )
do
echo "Starting task manager #$i"
/opt/flink/bin/taskmanager.sh start
done

echo "Started $CPU_COUNT task managers"

/opt/flink/bin/flink run --pyModule src.processor --pyFiles src


And I'm downloading these requirements:
boto3==1.34.48
apache-flink==1.20.0

I'm on a mac running java17, python 3.9.6, and flink 1.20.0. I am able to
run my code locally, however, once I put the code in a Docker container, I
get a "*java.net.MalformedURLException: no protocol:
['file:/opt/flink/opt/flink-python-1.20.0.jar']*" error. I put the entire
stack trace below:
Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/app/src/processor.py", line 104, in <module>
    FlinkProcessor().process()
  File "/app/src/processor.py", line 99, in process
    env.execute("SpikeDetection")
  File
"/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
line 824, in execute
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: java.net.MalformedURLException: no protocol:
['file:/opt/flink/opt/flink-python-1.20.0.jar']
        at java.base/java.net.URL.<init>(Unknown Source)
        at java.base/java.net.URL.<init>(Unknown Source)
        at java.base/java.net.URL.<init>(Unknown Source)
        at
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
        at
org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
        at
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
        at
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472)
        at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:192)
        at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)

org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more


I tested this against some of the example files in the pyflink repo, and I
got the same error for
*pyflink/examples/datastream/connectors/kafka_json_format.py*

However, I ran against *pyflink/examples/datastream/process_json_data.py*
and that one seemed to work.

I'm not sure what is causing the issue, however everything runs fine
locally, and I only face this issue once I attempt to run from a Docker
container.

Any help would be tremendously appreciated!!

And sorry for the large message. It's my first time posting for code help
online, so I hope I'm not breaking any rules :)

Reply via email to