Hi Robin, You said that during the checkpoint async phase the CPU is stable at 100%, which is pretty strange to me. Normally the cpu usage of the taskmanager process could exceed 100%, depending on what all the threads are doing. I'm wondering if there is any scheduling mechanism controlling the CPU usage of a process in your setup, such as leveraging CGroup in yarn or Kubernetes. In this case, the uploading thread may preempt cpu resources from the task processing thread. The second thing that might help is, you may check the io utilization during the checkpoint. The uploading thread keeps reading from the local disk and writing to the remote, which may affect the io and state access latency, especially when the state size is large.
Best, Zakelly On Tue, Oct 25, 2022 at 12:10 AM Robin Cassan via user <user@flink.apache.org> wrote: > > Hello Yuan Mei! Thanks a lot for your answer :) > > About the CPU usage, it is pretty stable at 80% normally. Every 15 minutes we > trigger a checkpoint, and during this time it is stable at 100% > I am starting to wonder if CPU is the real limiting factor, because when > checking the Flink UI I see that most of the checkpoint duration is async. I > do not know how the async phase affects backpressure, but it does look like > the upload to S3 phase is causing the backpressure. The sync phase is quite > short as well. > Looking at this article > https://flink.apache.org/2022/05/23/latency-part2.html it seems we already > are in the most efficient configuration (at-least-once, non-concurrent > checkpointing, rocksdb on local NVME SSDs...), I don't see an obvious > quick-win apart from scaling up the full cluster. > > Reducing the state size will be a big challenge but even then it would not > guarantee consistent latency, same for less frequent checkpoints. > For now it looks like our only option to achieve real-time computation would > be to not use Flink (or at least, not include these computations inside a job > with a big state that is checkpointed). Thanks again for the insight, and if > you happen to have any information on how we could prevent the async phase of > checkpoints to add backpressure on our stream I would be very interested! > > Le mer. 19 oct. 2022 à 10:57, Yuan Mei <yuanmei.w...@gmail.com> a écrit : >> >> Hey Robin, >> >> Thanks for sharing the detailed information. May I ask, when you are saying >> "CPU usage is around 80% when checkpoints aren't running, and capped at 100% >> when they are", do you see zigzag patterns of CPU usage, or is it kept >> capped at 100% of CPU? >> >> I think one possibility is that the sync phase of cp (the writebuffer flush >> during the sync phase) triggers a rocksdb compaction, and we saw this >> happens on Ververica services as well. >> >> At this moment, maybe you can try to make the checkpoint less frequent >> (increase the checkpoint interval) to reduce the frequency of compaction. >> Please let me know whether this helps. >> >> In long term, I think we probably need to separate the compaction process >> from the internal db and control/schedule the compaction process ourselves >> (compaction takes a good amount of CPU and reduces TPS). >> >> Best. >> Yuan >> >> >> >> On Thu, Oct 13, 2022 at 11:39 PM Robin Cassan via user >> <user@flink.apache.org> wrote: >>> >>> Hello all, hope you're well :) >>> We are attempting to build a Flink job with minimal and stable latency (as >>> much as possible) that consumes data from Kafka. Currently our main >>> limitation happens when our job checkpoints the RocksDB state: backpressure >>> is applied on the stream, causing latency. I am wondering if there are ways >>> to configure Flink so that the checkpointing process affects the flow of >>> data as little as possible? >>> >>> In our case, backpressure seems to arise from CPU consumption, because: >>> - CPU usage is around 80% when checkpoints aren't running, and capped at >>> 100% when they are >>> - checkpoint alignment time is very low, using unaligned checkpoints >>> doesn't appear to help with backpressure >>> - network (async) part of the checkpoint should in theory not cause >>> backpressure since resources would be used for the main stream during async >>> waits, but I might be wrong >>> >>> What we would really like to achieve is isolating the compute resource used >>> for checkpointing from the ones used for task slots. Which would of course >>> mean that we need to oversize our cluster for having resources available >>> for checkpointing even when it's not running, but also that we would get >>> longer checkpoints compared to today where checkpoints seem to use CPU >>> cores attributed to task slots. We are ok with that to some degree, but we >>> don't know how to achieve this isolation. Do you have any clue? >>> >>> Lastly, we currently have nodes with 8 cores but allocate 6 task slots, and >>> we have set the following settings: >>> >>> state.backend.rocksdb.thread.num: 6 >>> state.backend.rocksdb.writebuffer.count: 6 >>> >>> >>> Thanks all for your help!