Hi guys,
I am facing an issue with running the python job inside the Flink docker
container.
I confirm that python is properly installed.
I am trying to submit python with below command,

/opt/flink/bin/flink run -py /opt/flink/flink_job/main.py

no matter how many times I try, I am getting below error,

Caused by: java.io.IOException: Cannot run program "python3": error=2, No
such file or directory
        at java.base/java.lang.ProcessBuilder.start(Unknown Source)
        at java.base/java.lang.ProcessBuilder.start(Unknown Source)
        at
org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:176)
        at
org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:140)
        at
org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:59)
        at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:441)
        at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:269)
        at
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)
        at
org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:126)
        at
org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
        at
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:555)
        at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:272)
        at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
        at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85)
        at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117)
        at
org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64)
        at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
        at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
        at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: error=2, No such file or directory
        at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
        at java.base/java.lang.ProcessImpl.<init>(Unknown Source)
        at java.base/java.lang.ProcessImpl.start(Unknown Source)
        ... 27 more


I even tried, /opt/flink/bin/flink run -py /opt/flink/flink_job/main.py
-pyexec python3, and similar 'pyexec' commands.
Is this a bug in the Flink docker image?

Below is my code,

def main():
# Define JAR paths
jars = [
"file:///opt/flink/flink_job/flink-connector-kafka-3.4.0-1.20.jar",
"file:///opt/flink/flink_job/flink-clients-1.20.1.jar",
"file:///opt/flink/flink_job/flink-streaming-java-1.20.1.jar",
"file:///opt/flink/flink_job/flink-python-1.20.0.jar",
"file:///opt/flink/flink_job/kafka-clients-3.6.2.jar"
]
# Create Configuration and set JARs
conf = Configuration()
conf.set_string("pipeline.jars", str(jars))
# conf.set_string("python.executable", "/usr/bin/python3") # Set Python
executable path

# Initialize Stream Execution Environment with the configuration
env = StreamExecutionEnvironment.get_execution_environment(conf)
env.set_parallelism(1)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

# Define Kafka Source
source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("testing31-events") \
.set_group_id("flink-group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

data_stream = env.from_source(source,
WatermarkStrategy.for_monotonous_timestamps(), "Kafka Source")

# Process each Kafka message and map it to InfluxDB write
data_stream.map(lambda msg: process_kafka_message(msg))

env.execute("Kafka to InfluxDB Stream")

Please help to fix this,

Thanks,
Siva Rama Krishna.

Reply via email to