Some additional information that I've gathered:
* The number of unique keys in the system is 10, and that is correctly reflected in the state. * TTL for global window state is set to update on read and write, but the code has logic to remove old state based on event time. * Not sure it's relevant, but the Flink cluster does run with jemalloc enabled. * GitHub gist with the whole processor setup since it's not too long: https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678 Relevant configuration entries (explicitly set, others are left with defaults): state.backend: rocksdb state.backend.incremental: true execution.checkpointing.interval: 30 s execution.checkpointing.min-pause: 25 s execution.checkpointing.timeout: 5 min execution.savepoint-restore-mode: CLAIM execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION Over the weekend, state size has grown to 1.23GB with the operators referenced in the processor program taking 849MB, so I'm still pretty puzzled. I thought it could be due to expired state being retained, but I think that doesn't make sense if I have finite keys, right? Regards, Alexis. From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> Sent: Samstag, 9. April 2022 01:39 To: ro...@apache.org Cc: user@flink.apache.org Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API Hi Roman, Here's an example of a WindowReaderFunction: public class StateReaderFunction extends WindowReaderFunction<Pojo, Integer, String, TimeWindow> { private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>( "descriptorId", Integer.class ); @Override public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<Integer> out) throws Exception { int count = 0; for (Integer i : context.windowState().getListState(LSD).get()) { count++; } out.collect(count); } } That's for the operator that uses window state. The other readers do something similar but with context.globalState(). That should provide the number of state entries for each key+window combination, no? And after collecting all results, I would get the number of state entries across all keys+windows for an operator. And yes, I do mean ProcessWindowFunction.clear(). Therein I call context.windowState().getListState(...).clear(). Side note: in the state processor program I call ExecutionEnvironment#setParallelism(1) even though my streaming job runs with parallelism=4, this doesn't affect the result, does it? Regards, Alexis. ________________________________ From: Roman Khachatryan <ro...@apache.org<mailto:ro...@apache.org>> Sent: Friday, April 8, 2022 11:06 PM To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: RocksDB's state size discrepancy with what's seen with state processor API Hi Alexis, If I understand correctly, the provided StateProcessor program gives you the number of stream elements per operator. However, you mentioned that these operators have collection-type states (ListState and MapState). That means that per one entry there can be an arbitrary number of state elements. Have you tried estimating the state sizes directly via readKeyedState[1]? > The other operator does override and call clear() Just to make sure, you mean ProcessWindowFunction.clear() [2], right? [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction- [2] https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context- Regards, Roman On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>> wrote: > > Hello, > > > > I have a streaming job running on Flink 1.14.4 that uses managed state with > RocksDB with incremental checkpoints as backend. I've been monitoring a dev > environment that has been running for the last week and I noticed that state > size and end-to-end duration have been increasing steadily. Currently, > duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with > the largest state (614MB) come from keyed sliding windows. Some attributes of > this job's setup: > > > > Windows are 11 minutes in size. > Slide time is 1 minute. > Throughput is approximately 20 events per minute. > > > > I have 3 operators with these states: > > > > Window state with ListState<Integer> and no TTL. > Global window state with MapState<Long, List<String>> and a TTL of 1 hour > (with cleanupInRocksdbCompactFilter(1000L)). > Global window state with ListState<Pojo> where the Pojo has an int and a > long, a TTL of 1 hour, and configured with > cleanupInRocksdbCompactFilter(1000L) as well. > > > > Both operators with global window state have logic to manually remove old > state in addition to configured TTL. The other operator does override and > call clear(). > > > > I have now analyzed the checkpoint folder with the state processor API, and > I'll note here that I see 50 folders named chk-*** even though I don't set > state.checkpoints.num-retained and the default should be 1. I loaded the data > from the folder with the highest chk number and I see that my operators have > these amounts respectively: > > > > 10 entries > 80 entries > 200 entries > > > > I got those numbers with something like this: > > > > savepoint > > .window(SlidingEventTimeWindows.of(Time.minutes(11L), > Time.minutes(1L))) > > .process(...) > > .collect() > > .parallelStream() > > .reduce(0, Integer::sum); > > > > Where my WindowReaderFunction classes just count the number of entries in > each call to readWindow. > > > > Those amounts cannot possibly account for 614MB, so what am I missing? > > > > Regards, > > Alexis. > >