Thanks, Geng for the quick and actionable response. I will definitely try this with Flink version >= 1.16.0 and get back with the observations.
Regarding the checkpoint size issue, my concern is if there is no more state, shouldn't the checkpoint size be way less than 2 GB? I mean I was expecting it to be only a few MBs. Is there something I am missing here? Regards, Abhishek Singla On Sun, Mar 26, 2023 at 9:56 PM Geng Biao <biaoge...@gmail.com> wrote: > Hi Abhishek, > > > > Thanks for sharing the experiment! As for the performance question, I > believe you could give a try on Flink CEP with version >= 1.16.0, which > includes the optimization introduced in FLINK-23890 > <https://issues.apache.org/jira/browse/FLINK-23890>. This optimization > will reduce lots of timer registration which can increase the throughput > significantly. In our own experiment, given same papalism settings, the > same job in 1.16.0 will require much less CPU usage than that in 1.15.x. > (~100% -> ~30%). In fact, due to the implementation, the optimization > should make CEP 10x better. If you must use Flink1.15.0 for some reason, > you may cherry-pick the relevant change and recompile the CEP library by > yourself. The change does not depend on some framework changes so it may > not cost much efforts. > > As for the checkpoint size issue, CEP Operator will store immediate > matching result in the state. So if there are no new events, then there are > no new partial matched and CEP Operator will not use more state. > > > > Best, > Biao Geng > > > > *From: *Abhishek Singla <abhisheksingla...@gmail.com> > *Date: *Sunday, March 26, 2023 at 11:58 PM > *To: *user@flink.apache.org <user@flink.apache.org> > *Subject: *Flink CEP Resource Utilisation Optimisation > > Hi Team, > > *Flink Version:* 1.15.0 > *Java Version:* 1.8 > *Standalone Cluster* > *Task Manager:* AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42 > Gb, 8 slots per TM) > *CEP Scenario:* Kafka Event A followed by Kafka Event B within 10 mins > *Throughput:* 20k events per second for Event A, 0 for Kafka Event B > *State Backend:* FsStateBackend > *Unaligned Checkpoints:* Enabled > *asynchronousSnapshots:* true > > > > While testing this (Kafka Event A followed by Kafka Event B within 10 > mins) scenario on load environment, it took 20 nodes of TM to achieve this > throughput otherwise either CPU utilization would reach its peak or > backpressure would be observed because output buffers are full. The > checkpoint size is only 6.75 GB, the state stored within the CEP operator > would be much lesser as we do unaligned checkpointing. > > > I am looking for some input on if it takes this many resources to > archive this throughput, and if not what probably could be the issue here. > > > > There was one more issue that I found If the throughput of Event A goes to > zero, then also the checkpoint size stays around 2 GB even after hours. Is > this expected? > > Regards, > Abhishek Singla >