Hi Alexis, If I understand correctly, the provided StateProcessor program gives you the number of stream elements per operator. However, you mentioned that these operators have collection-type states (ListState and MapState). That means that per one entry there can be an arbitrary number of state elements.
Have you tried estimating the state sizes directly via readKeyedState[1]? > The other operator does override and call clear() Just to make sure, you mean ProcessWindowFunction.clear() [2], right? [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction- [2] https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context- Regards, Roman On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> wrote: > > Hello, > > > > I have a streaming job running on Flink 1.14.4 that uses managed state with > RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev > environment that has been running for the last week and I noticed that state > size and end-to-end duration have been increasing steadily. Currently, > duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with > the largest state (614MB) come from keyed sliding windows. Some attributes of > this job’s setup: > > > > Windows are 11 minutes in size. > Slide time is 1 minute. > Throughput is approximately 20 events per minute. > > > > I have 3 operators with these states: > > > > Window state with ListState<Integer> and no TTL. > Global window state with MapState<Long, List<String>> and a TTL of 1 hour > (with cleanupInRocksdbCompactFilter(1000L)). > Global window state with ListState<Pojo> where the Pojo has an int and a > long, a TTL of 1 hour, and configured with > cleanupInRocksdbCompactFilter(1000L) as well. > > > > Both operators with global window state have logic to manually remove old > state in addition to configured TTL. The other operator does override and > call clear(). > > > > I have now analyzed the checkpoint folder with the state processor API, and > I’ll note here that I see 50 folders named chk-*** even though I don’t set > state.checkpoints.num-retained and the default should be 1. I loaded the data > from the folder with the highest chk number and I see that my operators have > these amounts respectively: > > > > 10 entries > 80 entries > 200 entries > > > > I got those numbers with something like this: > > > > savepoint > > .window(SlidingEventTimeWindows.of(Time.minutes(11L), > Time.minutes(1L))) > > .process(...) > > .collect() > > .parallelStream() > > .reduce(0, Integer::sum); > > > > Where my WindowReaderFunction classes just count the number of entries in > each call to readWindow. > > > > Those amounts cannot possibly account for 614MB, so what am I missing? > > > > Regards, > > Alexis. > >