Dear community, I have been struggling a lot with the deployment of my PyFlink job. Moreover, the performance seems to be very disappointing especially the low-throughput latency. I have been playing around with configuration values, but it has not been improving. In short, I have a Datastream job with multiple Python operators including a ProcessFunction. The job reads from Kafka and writes to Kafka again. For single events, E2E latency has been somewhere between 600ms and 2000ms. When I'm increasing throughput, latency becomes in the order of seconds. This is when I configure my job like this config.set_integer("python.fn-execution.bundle.time", 1) config.set_integer("python.fn-execution.bundle.size", 1) I tried several configuration values, but the results are similar. Interestingly, I have a similar Python streaming application written in Apache Beam which does have low-latency, single events are processed < 30ms. If I recall correctly, they use the same technique with bundling and sending to Python processes. On the other hand, Beam uses an in-memory runner when running locally which might change the situation. I'm not sure how that compares to a local Flink MiniCluster.
I hoped that performance might improve when I deploy this on a (remote) Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink job to a remote Flink cluster. In my first attempt, I created a local TM + JM setup and tried to deploy it using the "./flink run" command. However, this command created a local MiniCluster again rather than submitting it to my remote cluster. The full command was: ./flink run --target remote \ -m localhost:8081 \ -pyarch venv.zip \ -pyexec venv.zip/venv/bin/python \ --parallelism 4 \ --python ~/Documents/runtime.py \ --jarfile ~/Documents/combined.jar Note that venv.zip stores all the Python dependencies for my PyFlink job whereas combined.jar stores the Java dependencies. I tried several variants of this command, but it *never *submitted to my running JobManager and always ran it locally. In my second attempt, I tried deploying it to a Kubernetes cluster using the following command: ./flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=flink-cluster \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dkubernetes.service-account=flink-service-account \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=pyflink:latest \ -pyarch venv.zip \ -pyexec venv.zip/venv/bin/python \ --parallelism 4 \ -py ~/Documents/runtime.py \ --jarfile ~/Documents/combined.jar I created the pyflink:latest image by following the documentation here [1] It was unclear to me if had to include my project files in this Docker image. When running it like this, it did submit it to the remote K8s cluster but I got an exception that it could not find my runtime.py file in some sort of tmp folder. Lastly, I wondered if it is possible to set a key for events send to the KafkaProducer. Right now, it seems you can only configure some (static) properties and the serializer. Is there are a workaround to be able to set the key and value of an event in PyFlink? I hope you can help me out with my struggles! Thanks in advance. Regards, Wouter [1] - https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python