Hello,

I have a streaming job running on Flink 1.14.4 that uses managed state with 
RocksDB with incremental checkpoints as backend. I've been monitoring a dev 
environment that has been running for the last week and I noticed that state 
size and end-to-end duration have been increasing steadily. Currently, duration 
is 11 seconds and size is 917MB (as shown in the UI). The tasks with the 
largest state (614MB) come from keyed sliding windows. Some attributes of this 
job's setup:


  *   Windows are 11 minutes in size.
  *   Slide time is 1 minute.
  *   Throughput is approximately 20 events per minute.

I have 3 operators with these states:


  1.  Window state with ListState<Integer> and no TTL.
  2.  Global window state with MapState<Long, List<String>> and a TTL of 1 hour 
(with cleanupInRocksdbCompactFilter(1000L)).
  3.  Global window state with ListState<Pojo> where the Pojo has an int and a 
long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) 
as well.

Both operators with global window state have logic to manually remove old state 
in addition to configured TTL. The other operator does override and call 
clear().

I have now analyzed the checkpoint folder with the state processor API, and 
I'll note here that I see 50 folders named chk-*** even though I don't set 
state.checkpoints.num-retained and the default should be 1. I loaded the data 
from the folder with the highest chk number and I see that my operators have 
these amounts respectively:


  1.  10 entries
  2.  80 entries
  3.  200 entries

I got those numbers with something like this:

savepoint
        .window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
        .process(...)
        .collect()
        .parallelStream()
        .reduce(0, Integer::sum);

Where my WindowReaderFunction classes just count the number of entries in each 
call to readWindow.

Those amounts cannot possibly account for 614MB, so what am I missing?

Regards,
Alexis.

Reply via email to