Hello,

There was a network issue in my environment and the job had to restart. After 
the job came back up, the logs showed a lot of lines like this:

RocksDBIncrementalRestoreOperation ... Starting to restore from state handle: 
...

Interestingly, those entries include information about sizes in bytes:

- 
445163.sst=ByteStreamStateHandle{handleName='file:/opt/flink/state/checkpoints/00000000000000000000000000000000/shared/18f95afa-dc66-467d-bd05-779895f24960',
 dataBytes=1328}
- privateState={MANIFEST-000004=File State: 
file:/opt/flink/state/checkpoints/00000000000000000000000000000000/shared/bd7fde24-3ef6-4e05-bbd6-1474f8051d5d
 [80921331 bytes]

I extracted a lot of that information and I can see that:

- If I sum all dataBytes from sharedState, that only accounts for a couple MB.
- Most of the state comes from privateState, specifically from the entries 
referring to MANIFEST File State; that accounts for almost 1.5GB.

I believe that is one of the files RocksDB uses internally, but is that related 
to managed state used by my functions? Or does that indicate size growth is 
elsewhere?

Regards,
Alexis.

-----Original Message-----
From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> 
Sent: Dienstag, 12. April 2022 15:39
To: ro...@apache.org
Cc: user@flink.apache.org
Subject: RE: RocksDB's state size discrepancy with what's seen with state 
processor API

Thanks for all the pointers. The UI does show combined state for a chain, but 
the only state descriptors inside that chain are the 3 I mentioned before. Its 
size is still increasing today, and duration is now around 30 seconds (I can't 
use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but 
I don't see anything weird. The counts go up and down depending on which one I 
load, but even the bigger ones have around 500-700 entries, which should only 
be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

    timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
    windowedStream      = timestampedStream -> reinterpretAsKeyedStream -> 
window (SlidingEventTimeWindows)
    windowedStream -> process1 -> sink1
    windowedStream -> process2 -> sink2
    windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing 
correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? 
I assume the window operators save some information in the state as well.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org>
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

/shared folder contains keyed state that is shared among different checkpoints 
[1]. Most of state should be shared in your case since you're using keyed state 
and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is 
loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) 
and compare the numbers of objects in their states. To prevent the job from 
discarding the state, it can either be stopped for some time and then restarted 
from the latest checkpoint; or the number of retained checkpoints can be 
increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but 
not the sizes of individual states). If the operators are chained then their 
combined state size will be shown. To prevent this, you can disable chaining 
[3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint 
is subsumed). However, this is not guaranteed: if there is any problem during 
deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> You suggest comparing counts of objects in different checkpoints, I assume 
> you mean copying my "checkpoints" folder at different times and comparing, 
> not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did 
> look at the folder in the running system, and I noticed that most of the 
> chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> corresponding to newer checkpoints. I would think this makes sense since the 
> configuration specifies that only 1 completed checkpoint should be retained, 
> but then why are the older chk-* folders still there? I did trigger a manual 
> restart of the Flink cluster in the past (before starting the long-running 
> test), but if my policy is to CLAIM the checkpoint, Flink's documentation 
> states that it would be cleaned eventually.
>
> Moreover, just by looking at folder sizes with "du", I can see that most of 
> the state is held in the "shared" folder, and that has grown for sure; I'm 
> not sure what "shared" usually holds, but if that's what's growing, maybe I 
> can rule out expired state staying around?. My pipeline doesn't use timers, 
> although I guess Flink itself may use them. Is there any way I could get some 
> insight into which operator holds larger states?
>
> Regards,
> Alexis.
>
> -----Original Message-----
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Dienstag, 12. April 2022 12:37
> To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> Hi Alexis,
>
> Thanks a lot for sharing this. I think the program is correct.
> Although it doesn't take timers into account; and to estimate the state size 
> more accurately, you could also use the same serializers used by the job.
> But maybe it makes more sense to compare the counts of objects in different 
> checkpoints and see which state is growing.
>
> If the number of keys is small, compaction should eventually clean up the old 
> values, given that the windows eventually expire. I think it makes sense to 
> check that watermarks in all windows are making progress.
>
> Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results 
> of the State Processor program.
>
> Regards,
> Roman
>
> On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa 
> <alexis.sarda-espin...@microfocus.com> wrote:
> >
> > Some additional information that I’ve gathered:
> >
> >
> >
> > The number of unique keys in the system is 10, and that is correctly 
> > reflected in the state.
> > TTL for global window state is set to update on read and write, but the 
> > code has logic to remove old state based on event time.
> > Not sure it’s relevant, but the Flink cluster does run with jemalloc 
> > enabled.
> > GitHub gist with the whole processor setup since it’s not too long:
> > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> >
> >
> >
> > Relevant configuration entries (explicitly set, others are left with 
> > defaults):
> >
> >
> >
> > state.backend: rocksdb
> >
> > state.backend.incremental: true
> >
> > execution.checkpointing.interval: 30 s
> >
> > execution.checkpointing.min-pause: 25 s
> >
> > execution.checkpointing.timeout: 5 min
> >
> > execution.savepoint-restore-mode: CLAIM
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> >
> >
> >
> > Over the weekend, state size has grown to 1.23GB with the operators 
> > referenced in the processor program taking 849MB, so I’m still pretty 
> > puzzled. I thought it could be due to expired state being retained, but I 
> > think that doesn’t make sense if I have finite keys, right?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> > Sent: Samstag, 9. April 2022 01:39
> > To: ro...@apache.org
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> >
> >
> > Hi Roman,
> >
> >
> >
> > Here's an example of a WindowReaderFunction:
> >
> >
> >
> >     public class StateReaderFunction extends 
> > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> >
> >         private static final ListStateDescriptor<Integer> LSD = new 
> > ListStateDescriptor<>(
> >
> >                 "descriptorId",
> >
> >                 Integer.class
> >
> >         );
> >
> >
> >
> >         @Override
> >
> >         public void readWindow(String s, Context<TimeWindow> 
> > context, Iterable<Pojo> elements, Collector<Integer> out) throws 
> > Exception {
> >
> >             int count = 0;
> >
> >             for (Integer i :
> > context.windowState().getListState(LSD).get()) {
> >
> >                 count++;
> >
> >             }
> >
> >             out.collect(count);
> >
> >         }
> >
> >     }
> >
> >
> >
> > That's for the operator that uses window state. The other readers do 
> > something similar but with context.globalState(). That should provide the 
> > number of state entries for each key+window combination, no? And after 
> > collecting all results, I would get the number of state entries across all 
> > keys+windows for an operator.
> >
> >
> >
> > And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
> > context.windowState().getListState(...).clear().
> >
> >
> >
> > Side note: in the state processor program I call 
> > ExecutionEnvironment#setParallelism(1) even though my streaming job runs 
> > with parallelism=4, this doesn't affect the result, does it?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >
> >
> > ________________________________
> >
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Friday, April 8, 2022 11:06 PM
> > To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> > Cc: user@flink.apache.org <user@flink.apache.org>
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> >
> >
> > Hi Alexis,
> >
> > If I understand correctly, the provided StateProcessor program gives 
> > you the number of stream elements per operator. However, you 
> > mentioned that these operators have collection-type states 
> > (ListState and MapState). That means that per one entry there can be 
> > an arbitrary number of state elements.
> >
> > Have you tried estimating the state sizes directly via readKeyedState[1]?
> >
> > > The other operator does override and call clear()
> > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/
> > or
> > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.
> > la
> > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFunct
> > io
> > n-
> >
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/o
> > rg
> > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > html#clear-org.apache.flink.streaming.api.functions.windowing.Proces
> > sW
> > indowFunction.Context-
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa 
> > <alexis.sarda-espin...@microfocus.com> wrote:
> > >
> > > Hello,
> > >
> > >
> > >
> > > I have a streaming job running on Flink 1.14.4 that uses managed state 
> > > with RocksDB with incremental checkpoints as backend. I’ve been 
> > > monitoring a dev environment that has been running for the last week and 
> > > I noticed that state size and end-to-end duration have been increasing 
> > > steadily. Currently, duration is 11 seconds and size is 917MB (as shown 
> > > in the UI). The tasks with the largest state (614MB) come from keyed 
> > > sliding windows. Some attributes of this job’s setup:
> > >
> > >
> > >
> > > Windows are 11 minutes in size.
> > > Slide time is 1 minute.
> > > Throughput is approximately 20 events per minute.
> > >
> > >
> > >
> > > I have 3 operators with these states:
> > >
> > >
> > >
> > > Window state with ListState<Integer> and no TTL.
> > > Global window state with MapState<Long, List<String>> and a TTL of 1 hour 
> > > (with cleanupInRocksdbCompactFilter(1000L)).
> > > Global window state with ListState<Pojo> where the Pojo has an int and a 
> > > long, a TTL of 1 hour, and configured with 
> > > cleanupInRocksdbCompactFilter(1000L) as well.
> > >
> > >
> > >
> > > Both operators with global window state have logic to manually remove old 
> > > state in addition to configured TTL. The other operator does override and 
> > > call clear().
> > >
> > >
> > >
> > > I have now analyzed the checkpoint folder with the state processor API, 
> > > and I’ll note here that I see 50 folders named chk-*** even though I 
> > > don’t set state.checkpoints.num-retained and the default should be 1. I 
> > > loaded the data from the folder with the highest chk number and I see 
> > > that my operators have these amounts respectively:
> > >
> > >
> > >
> > > 10 entries
> > > 80 entries
> > > 200 entries
> > >
> > >
> > >
> > > I got those numbers with something like this:
> > >
> > >
> > >
> > > savepoint
> > >
> > >         .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > Time.minutes(1L)))
> > >
> > >         .process(...)
> > >
> > >         .collect()
> > >
> > >         .parallelStream()
> > >
> > >         .reduce(0, Integer::sum);
> > >
> > >
> > >
> > > Where my WindowReaderFunction classes just count the number of entries in 
> > > each call to readWindow.
> > >
> > >
> > >
> > > Those amounts cannot possibly account for 614MB, so what am I missing?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Alexis.
> > >
> > >

Reply via email to