Hello, I have a streaming job running on Flink 1.14.4 that uses managed state with RocksDB with incremental checkpoints as backend. I've been monitoring a dev environment that has been running for the last week and I noticed that state size and end-to-end duration have been increasing steadily. Currently, duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with the largest state (614MB) come from keyed sliding windows. Some attributes of this job's setup:
* Windows are 11 minutes in size. * Slide time is 1 minute. * Throughput is approximately 20 events per minute. I have 3 operators with these states: 1. Window state with ListState<Integer> and no TTL. 2. Global window state with MapState<Long, List<String>> and a TTL of 1 hour (with cleanupInRocksdbCompactFilter(1000L)). 3. Global window state with ListState<Pojo> where the Pojo has an int and a long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) as well. Both operators with global window state have logic to manually remove old state in addition to configured TTL. The other operator does override and call clear(). I have now analyzed the checkpoint folder with the state processor API, and I'll note here that I see 50 folders named chk-*** even though I don't set state.checkpoints.num-retained and the default should be 1. I loaded the data from the folder with the highest chk number and I see that my operators have these amounts respectively: 1. 10 entries 2. 80 entries 3. 200 entries I got those numbers with something like this: savepoint .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L))) .process(...) .collect() .parallelStream() .reduce(0, Integer::sum); Where my WindowReaderFunction classes just count the number of entries in each call to readWindow. Those amounts cannot possibly account for 614MB, so what am I missing? Regards, Alexis.