Hi Puneet, The incremental checkpoint size of RocksDB state-backend is not exactly the delta state change, it is the size of newly uploaded SST files (which are not uploaded before). The newly uploaded SST files are generated by compaction or data flush. In other words, I don't think we should care about the checkpoint size too much. Instead, we should care more about the output results.
Best Yun Tang ________________________________ From: Martijn Visser <martijnvis...@apache.org> Sent: Wednesday, October 19, 2022 22:03 To: Puneet Duggal <puneetduggal1...@gmail.com> Cc: user <user@flink.apache.org> Subject: Re: Flink CEP Incremental Checkpoint Issue Hi, Given that Flink 1.12 is no longer supported by the community, can you validate this with the latest Flink version? (Currently 1.15). Next to that, the contents of your checkpoints is not only the results of your CEP, but given that you're using Exactly Once also there's internal information needed for providing those exactly once guarantees. Best regards, Martijn On Mon, Oct 17, 2022 at 10:09 PM Puneet Duggal <puneetduggal1...@gmail.com<mailto:puneetduggal1...@gmail.com>> wrote: Apologies for the mistake of calculation 120*6*2KB = 1440KB = 1.4MB > On 18-Oct-2022, at 1:35 AM, Puneet Duggal > <puneetduggal1...@gmail.com<mailto:puneetduggal1...@gmail.com>> wrote: > > Hi, > > I am working on a use case which uses Flink CEP for pattern detection. > > Flink Version - 1.12.1 > Deployment Mode - Session Mode (Highly Available) > State Backend - RocksDB > Checkpoint Interval - 2 mins > Checkpoint Mode - Exactly Once > > CEP pattern looks something like - A not_followed_by B within (40mins) > After Match Skip Strategy - Skip Past Last Event > > In order to test out incremental checkpointing and its size, I deployed a job > on a cluster (let's say cluster A, hence job name J(aa)) and that same job on > cluster B 1 week later (Job Name J(ab)). Basically at any given point in > time, both jobs( J(aa) and J(ab)) process exactly the same records. After 1 > week of deployment of J(ab), I found out that in spite of working on the same > records and window time of 40mins (after which unmatched patterns should > expire), the incremental checkpoint size of J(aa) is around 40-50MB whereas > that of J(ab) is 25-30MB. My assumption of the incremental checkpoint is that > it only contains delta state change after the last checkpoint which is same > for both jobs. Attached screenshots for J(ab) and J(aa) respectively. > > J(ab) > > <Screenshot 2022-10-18 at 1.25.18 AM.png> > > J(aa) > > <Screenshot 2022-10-18 at 1.26.25 AM.png> > > Checkpoint Configuration > > <Screenshot 2022-10-18 at 1.29.10 AM.png> > > > One more doubt on the same lines is that these jobs consume on an average 6 > events per second with one event of the size around 2KB. Assuming a > checkpoint interval of 2 mins and each event getting stored in CEP state, > total delta size of the state should be 2*60*6*1.32 = 316KB which is nowhere > near to size shown in the incremental checkpoint for both the jobs. Even > including meta info for these records, not sure what am i missing which is > causing incremental checkpoints to be so huge. > > > Regards, > Puneet >