Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?




On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
<alexis.sarda-espin...@microfocus.com> wrote:
> Hello,
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job’s setup:
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
> I have 3 operators with these states:
> Window state with ListState<Integer> and no TTL.
> Global window state with MapState<Long, List<String>> and a TTL of 1 hour 
> (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState<Pojo> where the Pojo has an int and a 
> long, a TTL of 1 hour, and configured with 
> cleanupInRocksdbCompactFilter(1000L) as well.
> Both operators with global window state have logic to manually remove old 
> state in addition to configured TTL. The other operator does override and 
> call clear().
> I have now analyzed the checkpoint folder with the state processor API, and 
> I’ll note here that I see 50 folders named chk-*** even though I don’t set 
> state.checkpoints.num-retained and the default should be 1. I loaded the data 
> from the folder with the highest chk number and I see that my operators have 
> these amounts respectively:
> 10 entries
> 80 entries
> 200 entries
> I got those numbers with something like this:
> savepoint
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>         .process(...)
>         .collect()
>         .parallelStream()
>         .reduce(0, Integer::sum);
> Where my WindowReaderFunction classes just count the number of entries in 
> each call to readWindow.
> Those amounts cannot possibly account for 614MB, so what am I missing?
> Regards,
> Alexis.

Reply via email to