Hi Kirill, Your understanding of checkpointing is very good. Just a couple of additions.
Incremental checkpoints usually help when you have a low number of changed keys. Afaik in statefun, the keys correspond to a function address (name X id). So if you have few function addresses and all of them receive a state update between two checkpoints, then an incremental checkpoint is even slower than a regular checkpoint. You can usually quickly see that by looking at the checkpoint size. A full checkpoint should be an order of magnitude larger than an incremental checkpoint. If you don't need incremental checkpoints, you could also switch to the memory state.backend. All changed data will be kept in-memory and sent to the durable storage on checkpoint. Usually, as long as you can fit the state into the main memory, that's the faster option that you should try out. 15 GB should easily fit on some compute node and writing that to S3 is also fast (and async). The sync phase of that state backend is very fast. Making checkpoints completely async is an ongoing effort in FLIP-158 [1] where the first version is expected to appear in Flink 1.14 in 1-2 months. [1] https://issues.apache.org/jira/browse/FLINK-21352 On Wed, Jul 28, 2021 at 4:09 PM Kirill Kosenko < kirill.kose...@transportexchangegroup.com> wrote: > > > > > > > > > * Hello I'm new to Flink. I am playing with Stateful Functions and have a > question about checkpoints and how they work. Some configuration details: > state.backend: rocksdb state.backend.incremental: true > execution.checkpointing.mode: AT_LEAST_ONCE As far as I know: 1. There is a > sync checkpoint phase. I suppose flush of the memtable to the sst files > during this phase happens and a snapshot is taken after. It appears to be a > blocking operation. 2. If I got the idea right - snapshot should be sent > asynchronously to a durable storage right after. Please confirm my > understanding. I noticed when the checkpoint is triggered there is a delay > in messages processing. In case without a checkpoint, message processing > usually takes less than 100ms in case checkpoint triggers. Message > processing usually takes less than 100ms if the checkpoint hasn’t been > triggered yet. Unfortunately, after the checkpoint is triggered the same > message processing takes over 2 seconds. This doesn’t match our > expectations, as we need messages to be processed significantly faster(in > real-time), ideally less than 1 second during the checkpoint. From what I > noticed, the larger state we have the longer checkpoint time is required to > make a snapshot while using an incremental approach: if the size of the > state is about 100MB, then the checkpoint time takes less than 1 second. > That works for me. However, for 15GB state the checkpoint time takes 3-5 > seconds. I just want to be sure that state size augments the checkpoint > time, despite the fact I use incremental checkpoints. Please confirm or > disprove my understanding. Is there a way to speed up the checkpoint time > or alternatively make the checkpoints completely asynchronous? Thanks * >