Hi Pachara and Nikola,

As Nikola said, starting a FlinkSessionJob and a FlinkDeployment is vastly
different under the hood for the operator.

For FlinkDeployments, I think it's quite straightforward: the job
submission will be fully handled by the Flink pod, including fetching the
jar and arguments of the job will be parsed by the `CliFrontend` class upon
start.

For FlinkSessionJobs however, the jar file will be downloaded by the
operator [1] and parsing of the arguments will be handled by the Flink
entrypoint class, which in your case is going to be
`org.apache.flink.client.python.PythonDriver` [2] which has only 2
arguments [3]. Because of this, you will also have to remove the first two
arguments in your specification: "-pyclientexec", "/usr/bin/python3"
Also, if you use the PythonDriver class as the entrypoint, you can use any
jarUri, as that class is already on the classpath for the official Docker
images. In my tests, I could even leave jarUri completely empty.

Best regards,
Mate

[1]
https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java
[2]
https://github.com/apache/flink/blob/4e6dc79e785ff18920c15a38ba50f57111bf7623/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java#L56
[3]
https://github.com/apache/flink/blob/330aae0c6e0811f50888d17830f10f7a29efe7d7/flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptionsParserFactory.java#L41

On Mon, Jul 7, 2025 at 9:39 AM Nikola Milutinovic <n.milutino...@levi9.com>
wrote:

> Hi Pachara.
>
>
>
> We are also struggling to get a PyFlink job to run. Hitting some other
> wall, but I would like to share some thoughts here.
>
>
>
> At the moment, we are launching our fleet of jobs using a specially
> crafted Docker image: Flink 1.20.1 + plugins + PyFlink + our Python libs +
> our jobs. From that image we are invoking “flink run -py … -pyFile …”. So,
> that image is running as a Flink client and it interacts with Flink Session
> Cluster.
>
>
>
> The reason why I am mentioning all of this is that – I suspect – under
> covers, Flink K8s Operator works in similar fashion. When you make a K8s
> Custom Resource, the Operator will act as a client and it will try to read
> those files you mention.
>
>
>
> And there lies the problem. file:///flink-tmp/... Is a reference to a
> local file, but local to what? Kubernetes operator, I suspect. That is why
> some examples show the following:
>
>
>
> spec:
>
>     job:
>
>         jarURI: local:///opt/flink/lib/flink-python-1.20.1.jar
>
>         …
>
>
>
> This fails for us, stating that “local” is not a recognized URI schema.
> Never mind, we skipped it (and hit another wall).
>
>
>
> Sooo,…. You could try putting your Python files on some global storage,
> like S3 and see if that works. If it does, please tell us.
>
>
>
> Nikola.
>
>
>
> *From: *Pachara Aryuyuen (Kas) <paryuy...@zilo.co.uk>
> *Date: *Monday, July 7, 2025 at 8:38 AM
> *To: *user@flink.apache.org <user@flink.apache.org>
> *Cc: *Santhana Jinjakam (Eye) <sjinja...@zilo.co.uk>
> *Subject: *[PyFlink] Issue Deploying FlinkSessionJob with PVC and Python
> Script Access
>
> Dear Flink Community,
>
> I’m currently working on deploying a *FlinkSessionJob* using *PyFlink*
> and have encountered an issue I hope someone can help clarify.
>
> Here’s my setup:
>
>    - I installed the *Flink Kubernetes Operator* in *namespace A*
>    - I deployed a *FlinkDeployment* in *namespace B*
>    - I’m trying to deploy a *FlinkSessionJob* in *namespace B*
>
> The job fails with a file not found error related to the jarURI. I’m
> referencing a JAR file located at /flink-tmp/, which is mounted as a *PVC*
> on the FlinkDeployment. This PVC is backed by *EFS*, so it should be
> accessible across pods. I’ve confirmed that the file exists at that path.
>
> I also tried using the following public JAR instead:
>
> jarURI:
> https://repo1.maven.org/maven2/org/apache/flink/flink-python/1.20.1/flink-python-1.20.1.jar
>
> However, even with this change, the Python script located in the same
> mounted volume (/flink-tmp/transform_confirmed_trade.py) still cannot be
> found.
>
> I haven’t been able to find any best practices or working examples of 
> *FlinkSessionJob
> with PyFlink*. Do you have any sample manifests or guidance for this use
> case?
>
> Here is the manifest I’m using:
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkSessionJob
>
> metadata:
>
>   name: transform-confirmed-trade-job
>
>   namespace: namespace-b
>
> spec:
>
>   deploymentName: flink-cluster
>
>   job:
>
>     jarURI: file:///flink-tmp/flink-python-1.20.1.jar
>
>     entryClass: "org.apache.flink.client.python.PythonDriver"
>
>     args: ["-pyclientexec", "/usr/bin/python3", "--python",
> "/flink-tmp/transform_confirmed_trade.py"]
>
>     parallelism: 2
>
>     upgradeMode: stateless
>
> *Additional Question:*
>
> Is it possible to configure *TaskManagers* to run *permanently* (i.e.,
> not shut down when idle)? If so, what configuration options are required to
> achieve this behavior?
>
> Any help or pointers would be greatly appreciated.
>
> Best regards,
>
> Pachara Aryuyuen (Kas)
>
> Cloud Platform Engineer
>
>
>

Reply via email to