Dear community,

I have been struggling a lot with the deployment of my PyFlink job.
Moreover, the performance seems to be very disappointing especially the
low-throughput latency. I have been playing around with configuration
values, but it has not been improving.
In short, I have a Datastream job with multiple Python operators including
a ProcessFunction. The job reads from Kafka and writes to Kafka again. For
single events, E2E latency has been somewhere between 600ms and 2000ms.
When I'm increasing throughput, latency becomes in the order of seconds.
This is when I configure my job like this
        config.set_integer("python.fn-execution.bundle.time", 1)
        config.set_integer("python.fn-execution.bundle.size", 1)
I tried several configuration values, but the results are similar.
Interestingly, I have a similar Python streaming application written in
Apache Beam which does have low-latency, single events are processed <
30ms.  If I recall correctly, they use the same technique with bundling and
sending to Python processes.
On the other hand, Beam uses an in-memory runner when running locally which
might change the situation. I'm not sure how that compares to a local Flink
MiniCluster.

I hoped that performance might improve when I deploy this on a (remote)
Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink
job to a remote Flink cluster. In my first attempt, I created a local TM +
JM setup and tried to deploy it using the "./flink run" command.
However, this command created a local MiniCluster again rather than
submitting it to my remote cluster. The full command was:
./flink run --target remote \
-m localhost:8081 \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 4 \
--python ~/Documents/runtime.py \
--jarfile ~/Documents/combined.jar

Note that venv.zip stores all the Python dependencies for my PyFlink job
whereas combined.jar stores the Java dependencies. I tried several variants
of this command, but it *never *submitted to my running JobManager and
always ran it locally.
In my second attempt, I tried deploying it to a Kubernetes cluster using
the following command:

./flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dkubernetes.service-account=flink-service-account \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=pyflink:latest \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 4 \
-py ~/Documents/runtime.py \
--jarfile ~/Documents/combined.jar

I created the pyflink:latest image by following the documentation here [1]
It was unclear to me if had to include my project files in this Docker
image.
When running it like this, it did submit it to the remote K8s cluster but I
got an exception that it could not find my runtime.py file in some sort of
tmp folder.

Lastly, I wondered if it is possible to set a key for events send to the
KafkaProducer. Right now, it seems you can only configure some (static)
properties and the serializer.
Is there are a workaround to be able to set the key and value of an event
in PyFlink?

I hope you can help me out with my struggles! Thanks in advance.

Regards,
Wouter

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python

Reply via email to