Till, I’ll have to calculate the theoretical upper bound for our window state. Our data distribution and rate has a predictable pattern but the data rate pattern didn’t match the checkpoint size growth.
[cid:image001.png@01D6359B.BE0FD540] Here is a screenshot of the checkpoint size for the pipeline. The yellow section is when we had the checkpoint interval at 2 secs – the size seems to grow linearly and indefinitely. The blue, red and orange lines are in line with what I’d expect in terms of checkpoint size (100KB-2 MB). The incoming stream data for the whole time period is consistent (follows the same pattern). Changing the checkpoint interval seemed to fix the problem of the large and growing checkpoint size but I’m not sure why. Thanks! -Matt From: Till Rohrmann <trohrm...@apache.org> Date: Thursday, May 28, 2020 at 10:48 AM To: "Wissman, Matt" <matt.wiss...@here.com> Cc: Guowei Ma <guowei....@gmail.com>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Tumbling windows - increasing checkpoint size over time Hi Matt, when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the case that you see different checkpoint sizes depending on the actual data distribution which can change over time. Have you checked whether the data distribution and rate is constant over time? What is the expected number of keys, size of events and number of events per key per second? Based on this information one could try to estimate an upper state size bound. Cheers, Till On Wed, May 27, 2020 at 8:19 PM Wissman, Matt <matt.wiss...@here.com<mailto:matt.wiss...@here.com>> wrote: Hello Till & Guowei, Thanks for the replies! Here is a snippet of the window function: SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream .keyBy(idKeySelector()) .window(TumblingProcessingTimeWindows.of(seconds(15))) .apply(new Aggregator()) .name("Aggregator") .setParallelism(3); Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint size growth) Lateness allowed: 0 Watermarks: nothing is set in terms of watermarks – do they apply for Process Time? The set of keys processed in the stream is stable over time The checkpoint size actually looks pretty stable now that the interval was increased. Is it possible that the short checkpoint interval prevented compaction? Thanks! -Matt From: Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Date: Wednesday, May 27, 2020 at 9:00 AM To: Guowei Ma <guowei....@gmail.com<mailto:guowei....@gmail.com>> Cc: "Wissman, Matt" <matt.wiss...@here.com<mailto:matt.wiss...@here.com>>, "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Tumbling windows - increasing checkpoint size over time LEARN FAST: This email originated outside of HERE. Please do not click on links or open attachments unless you recognize the sender and know the content is safe. Thank you. Hi Matt, could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval? Are you using event time? If yes, how is the watermark generated? You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events? Cheers, Till On Wed, May 27, 2020 at 1:21 AM Guowei Ma <guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote: Hi, Matt The total size of the state of the window operator is related to the number of windows. For example if you use keyby+tumblingwindow there would be keys number of windows. Hope this helps. Best, Guowei Wissman, Matt <matt.wiss...@here.com<mailto:matt.wiss...@here.com>> 于2020年5月27日周三 上午3:35写道: > > Hello Flink Community, > > > > I’m running a Flink pipeline that uses a tumbling window and incremental > checkpoint with RocksDB backed by s3. The number of objects in the window is > stable but overtime the checkpoint size grows seemingly unbounded. Within the > first few hours after bringing the Flink pipeline up, the checkpoint size is > around 100K but after a week of operation it grows to around 100MB. The > pipeline isn’t using any other Flink state besides the state that the window > uses. I think this has something to do with RocksDB’s compaction but > shouldn’t the tumbling window state expire and be purged from the checkpoint? > > > > Flink Version 1.7.1 > > > > Thanks! > > > > -Matt