Hello,

I have a question about what actually happens when a job is started from an
existing checkpoint, in particular when the parallelism has changed.

*Context:*
We have a flink 1.11.2 (DataStream API) job running on Kubernetes (GCP)
writing its state to GCS.
Normally we run with 12 TMs each 3 CPU cores and about 12gb RAM. We have
quite a bit of state (stored with rocksdb), about 20-25 operators which
have state ranging from 20gb to 180gb per operator. In total we have about
600gb of state.

During normal operations, this works fine the only 'problem' we have is
that savepoints (creation and starting from) are very slow. Therefore we
use external checkpoints to deploy new versions of our job.

*What is our problem?*
One of the things I am currently trying to investigate is why rescaling our
job is so slow. The way we rescale is by canceling the job and then
starting the job with a higher parallelism, whilst pointing to the previous
(external) checkpoint.

Without rescaling, for instance when deploying new code, starting a job
from a checkpoint would cause the first new checkpoint to complete in maybe
5 minutes.
However, if I double the parallelism the first checkpoint takes over an
hour or more to complete. This is troublesome because kubernetes might
sometime decide to restart a TM causing a job restart and thus having to
redo all the checkpoint work...( very annoying if this happens when the
checkpoint is about to finish.. :) )

*What happens during a checkpoint:*
Looking at metrics we can see:
 * CPU being at 100%
 * RAM swinging up and down depending on what operator is currently
checkpointing.
 * Network traffic to GCS peaks at 100mb/s per TM (tests indicate network
should not be a cause a bottle neck).
 * Disk (SSD) iops are in the order of 2-3000 upwards to spikes of 10k
iops, not even close to capacity

Now the obvious answer would be to increase the CPU. This does not really
seem to help though, plus we'd really like to prevent having to vertically
scale our job just to do parallelism changes, as during normal operations
our CPU usage is around 50-60%.

*Question:*
My question is:
What actually happens when flink starts a new job from an existing
checkpoint. What extra work needs to be done because of a change in
parallelism? Is it 'normal' that we would incur this penalty for scaling up
or down?
Do you have any pointers where we should look to get better performance?

Thank you in advance :)

Richard

Reply via email to