Hi Siva.

I believe, when we built our Flink image (Flink 1.20.0 + several connectors + 
our Python library code), we had to make a symbolic link:

ln -s /usr/bin/python3 /usr/bin/python

Try that. Or put a full path in -pyexec argument.

Nix.

From: Siva Krishna <sivasoe...@gmail.com>
Date: Monday, February 17, 2025 at 6:13 AM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Bug in Flink 'run' for Python Job ?
Hi guys,
I am facing an issue with running the python job inside the Flink docker 
container.
I confirm that python is properly installed.
I am trying to submit python with below command,

/opt/flink/bin/flink run -py /opt/flink/flink_job/main.py

no matter how many times I try, I am getting below error,

Caused by: java.io.IOException: Cannot run program "python3": error=2, No such 
file or directory
        at java.base/java.lang.ProcessBuilder.start(Unknown Source)
        at java.base/java.lang.ProcessBuilder.start(Unknown Source)
        at 
org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:176)
        at 
org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:140)
        at 
org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:59)
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:441)
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:269)
        at 
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)
        at 
org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:126)
        at 
org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
        at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:555)
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:272)
        at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
        at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85)
        at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117)
        at 
org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: error=2, No such file or directory
        at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
        at java.base/java.lang.ProcessImpl.<init>(Unknown Source)
        at java.base/java.lang.ProcessImpl.start(Unknown Source)
        ... 27 more


I even tried, /opt/flink/bin/flink run -py /opt/flink/flink_job/main.py -pyexec 
python3, and similar 'pyexec' commands.
Is this a bug in the Flink docker image?
Below is my code,
def main():
# Define JAR paths
jars = [
"file:///opt/flink/flink_job/flink-connector-kafka-3.4.0-1.20.jar",
"file:///opt/flink/flink_job/flink-clients-1.20.1.jar",
"file:///opt/flink/flink_job/flink-streaming-java-1.20.1.jar",
"file:///opt/flink/flink_job/flink-python-1.20.0.jar",
"file:///opt/flink/flink_job/kafka-clients-3.6.2.jar"
]
# Create Configuration and set JARs
conf = Configuration()
conf.set_string("pipeline.jars", str(jars))
# conf.set_string("python.executable", "/usr/bin/python3") # Set Python 
executable path

# Initialize Stream Execution Environment with the configuration
env = StreamExecutionEnvironment.get_execution_environment(conf)
env.set_parallelism(1)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

# Define Kafka Source
source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("testing31-events") \
.set_group_id("flink-group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

data_stream = env.from_source(source, 
WatermarkStrategy.for_monotonous_timestamps(), "Kafka Source")

# Process each Kafka message and map it to InfluxDB write
data_stream.map(lambda msg: process_kafka_message(msg))

env.execute("Kafka to InfluxDB Stream")

Please help to fix this,
Thanks,
Siva Rama Krishna.

Reply via email to