Hi! Thanks for the clarification.
Could you expand checkpoint #195 and #196 for details? Slow end-to-end checkpoint time may be caused by various reasons, for example if the data processing is slow it will backpressure the checkpoint (if that is the case, alignment duration should be high for some vertices), or it is possible that JVM is under heavy GC at that time. By "increase the parallelism" I mean to increase the parallelism of the Flink job. If you double the parallelism each subtask will only have to deal with half of the data before and this should speed up data processing and checkpointing. state.backend.rocksdb.checkpoint.transfer.thread.num might help but for a ~300MB checkpoint I guess we do not need to tune this. Vidya Sagar Mula <mulasa...@gmail.com> 于2021年12月7日周二 15:22写道: > Hi Caizhi, > > Thank you for your response. > I am attaching 3 screen shots for better understanding. Can you please > take a look. > Screenshot 1: Flink UI with the checkpoiting history (Size and time taken > are displayed) > Screenshot 2: Flink UI for Task managers. I have one TM. > Screenshot 3: Grafana UI for CPU and memory utilization. > > For point 1: It is incremental checkpointing. However, I am making my > input data in such a way that the keys in the data records are not the > same. So, it is going to be full checkpointing size. > > > For point 2: If you notice the screen shot with Flink UI, when the > checkpointing size is reached to 267MB, the time taken is almost 4 mins, > which is definitely not expected. You mentioned increasing the parallelism. > Can you please explain a little bit more on this? > There is a RocksDB configurable parameter " > *state.backend.rocksdb.checkpoint.transfer.thread.num"* (default=1). Are > you referring to this parameter to increase the parallelism. Can you please > elaborate on this? > > > And also, there are some tunable RocksDB parameters as mentioned below. I > am currently using the default values. Do I need to set some higher values > to increase the checkpoint size with the increased input data size. > > Can you please clarify if I need to tune up any of the configurable > paremeters to increase the checkpointing size and to reduce the time taken > for each checkpointing. > > *state.backend.rocksdb.checkpoint.transfer.thread.num (default to 1)* > > *state.backend.rocksdb.writebuffer.size (RocksDB defaults to 4MB)* > > state.backend.rocksdb.writebuffer.count (RocksDB defaults to 2) > > state.backend.rocksdb.writebuffer.number-to-merge (RocksDB defaults to 1) > > *state.backend.rocksdb.block.blocksize (RocksDB defaults to 4KB)* > > state.backend.rocksdb.block.cache-size ( RocksDB defaults to 8MB) > > > > > On Mon, Dec 6, 2021 at 6:05 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> the checkpointing size is not going beyond 300 MB >> >> >> Is 300MB the total size of checkpoint or the incremental size of >> checkpoint? If it is the latter one, Flink will only store necessary >> information (for example the keys and the fields that are selected) in >> checkpoint and it is compressed, so for 3~5GB input records it is >> reasonable for the incremental checkpoint size to shrink to ~300MB. Of >> course this depends on the detailed workflow. >> >> there must be something bottleneck with Flink RocksDB configurations. >>> >> >> By "bottleneck" do you mean the checkpoint speed is too slow? If yes you >> can try to increase the parallelism of the job so that there will be less >> burden on each parallelism when making checkpoints. >> >> Vidya Sagar Mula <mulasa...@gmail.com> 于2021年12月7日周二 08:04写道: >> >>> Hi, >>> >>> In my project, we are trying to configure the "Incremental >>> checkpointing" with RocksDB in the backend. >>> >>> We are using Flink 1.11 version and RockDB with AWS : S3 backend >>> >>> Issue: >>> ------ >>> In my pipeline, my window size is 5 mins and the incremental >>> checkpointing is happening for every 2 mins. >>> I am pumping the data in such a way that the keys are not the same for >>> each record. That means, the incremental checkpointing size should keep >>> increasing for each checkpoint. >>> >>> So, the expectation here is that the size of the checkpointing should >>> reach atleast 3-5 GB with the amount of the data pumped in. >>> >>> However, the checkpointing size is not going beyond 300 MB and that too >>> it is taking around 2 mins duration for taking this 300 MB checkpoint. >>> >>> In my set up, I am using >>> >>> Cluster: Cloud cluster with instance storage. >>> Memory : 20 GB, >>> Heap : 10 GB >>> Flink Memory: 4.5 GB >>> Flink Version : 1.11 >>> Back end: RocksDB with AWS S3 backend >>> >>> >>> I would feel that, there must be something bottleneck with Flink RocksDB >>> configurations. >>> Can you please advise me? >>> >>> Thanks, >>> >>> >>> >>> >>>