Hello, I have a question about what actually happens when a job is started from an existing checkpoint, in particular when the parallelism has changed.
*Context:* We have a flink 1.11.2 (DataStream API) job running on Kubernetes (GCP) writing its state to GCS. Normally we run with 12 TMs each 3 CPU cores and about 12gb RAM. We have quite a bit of state (stored with rocksdb), about 20-25 operators which have state ranging from 20gb to 180gb per operator. In total we have about 600gb of state. During normal operations, this works fine the only 'problem' we have is that savepoints (creation and starting from) are very slow. Therefore we use external checkpoints to deploy new versions of our job. *What is our problem?* One of the things I am currently trying to investigate is why rescaling our job is so slow. The way we rescale is by canceling the job and then starting the job with a higher parallelism, whilst pointing to the previous (external) checkpoint. Without rescaling, for instance when deploying new code, starting a job from a checkpoint would cause the first new checkpoint to complete in maybe 5 minutes. However, if I double the parallelism the first checkpoint takes over an hour or more to complete. This is troublesome because kubernetes might sometime decide to restart a TM causing a job restart and thus having to redo all the checkpoint work...( very annoying if this happens when the checkpoint is about to finish.. :) ) *What happens during a checkpoint:* Looking at metrics we can see: * CPU being at 100% * RAM swinging up and down depending on what operator is currently checkpointing. * Network traffic to GCS peaks at 100mb/s per TM (tests indicate network should not be a cause a bottle neck). * Disk (SSD) iops are in the order of 2-3000 upwards to spikes of 10k iops, not even close to capacity Now the obvious answer would be to increase the CPU. This does not really seem to help though, plus we'd really like to prevent having to vertically scale our job just to do parallelism changes, as during normal operations our CPU usage is around 50-60%. *Question:* My question is: What actually happens when flink starts a new job from an existing checkpoint. What extra work needs to be done because of a change in parallelism? Is it 'normal' that we would incur this penalty for scaling up or down? Do you have any pointers where we should look to get better performance? Thank you in advance :) Richard