State Processor API works on a higher level and is not aware of any RocksDB specifics (in fact, it can be used with any backend).
Regards, Roman On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> wrote: > > I can look into RocksDB metrics, I need to configure Prometheus at some point > anyway. However, going back to the original question, is there no way to gain > more insight into this with the state processor API? You've mentioned > potential issues (too many states, missing compaction) but, with my > admittedly limited understanding of the way RocksDB is used in Flink, I would > have thought that such things would be visible when using the state > processor. Is there no way for me to "parse" those MANIFEST files with some > of Flink's classes and get some more hints? > > Regards, > Alexis. > > ________________________________ > From: Roman Khachatryan <ro...@apache.org> > Sent: Tuesday, April 19, 2022 5:51 PM > To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > Cc: user@flink.apache.org <user@flink.apache.org> > Subject: Re: RocksDB's state size discrepancy with what's seen with state > processor API > > > I assume that when you say "new states", that is related to new descriptors > > with different names? Because, in the case of windowing for example, each > > window "instance" has its own scoped (non-global and keyed) state, but > > that's not regarded as a separate column family, is it? > Yes, that's what I meant, and that's regarded as the same column family. > > Another possible reason is that SST files aren't being compacted and > that increases the MANIFEST file size. > I'd check the total number of loaded SST files and the creation date > of the oldest one. > > You can also see whether there are any compactions running via RocksDB > metrics [1] [2] (a reporter needs to be configured [3]). > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending > [3] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters > > Regards, > Roman > > On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa > <alexis.sarda-espin...@microfocus.com> wrote: > > > > Hi Roman, > > > > I assume that when you say "new states", that is related to new descriptors > > with different names? Because, in the case of windowing for example, each > > window "instance" has its own scoped (non-global and keyed) state, but > > that's not regarded as a separate column family, is it? > > > > For the 3 descriptors I mentioned before, they are only instantiated once > > and used like this: > > > > - Window list state: each call to process() executes > > context.windowState().getListState(...).get() > > - Global map state: each call to process() executes > > context.globalState().getMapState(...) > > - Global list state: within open(), runtimeContext.getListState(...) is > > executed once and used throughout the life of the operator. > > > > According to [1], the two ways of using global state should be equivalent. > > > > I will say that some of the operators instantiate the state descriptor in > > their constructors, i.e. before they are serialized to the TM, but the > > descriptors are Serializable, so I imagine that's not relevant. > > > > [1] https://stackoverflow.com/a/50510054/5793905 > > > > Regards, > > Alexis. > > > > -----Original Message----- > > From: Roman Khachatryan <ro...@apache.org> > > Sent: Dienstag, 19. April 2022 11:48 > > To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > > Cc: user@flink.apache.org > > Subject: Re: RocksDB's state size discrepancy with what's seen with state > > processor API > > > > Hi Alexis, > > > > Thanks a lot for the information, > > > > MANIFEST files list RocksDB column families (among other info); ever > > growing size of these files might indicate that some new states are > > constantly being created. > > Could you please confirm that the number of state names is constant? > > > > > Could you confirm if Flink's own operators could be creating state in > > > RocksDB? I assume the window operators save some information in the state > > > as well. > > That's correct, window operators maintain a list of elements per window and > > a set of timers (timestamps). These states' names should be fixed > > (something like "window-contents" and "window-timers"). > > > > > is that related to managed state used by my functions? Or does that > > > indicate size growth is elsewhere? > > The same mechanism is used for both Flink internal state and operator > > state, so it's hard to say without at least knowing the state names. > > > > > > Regards, > > Roman > > > > > > On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote: > > > > > > /shared folder contains keyed state that is shared among different > > > checkpoints [1]. Most of state should be shared in your case since > > > you're using keyed state and incremental checkpoints. > > > > > > When a checkpoint is loaded, the state that it shares with older > > > checkpoints is loaded as well. I suggested to load different > > > checkpoints (i.e. chk-* folders) and compare the numbers of objects in > > > their states. To prevent the job from discarding the state, it can > > > either be stopped for some time and then restarted from the latest > > > checkpoint; or the number of retained checkpoints can be increased > > > [2]. Copying isn't necessary. > > > > > > Besides that, you can also check state sizes of operator in Flink Web > > > UI (but not the sizes of individual states). If the operators are > > > chained then their combined state size will be shown. To prevent this, > > > you can disable chaining [3] (although this will have performance > > > impact). > > > > > > Individual checkpoint folders should be eventually removed (when the > > > checkpoint is subsumed). However, this is not guaranteed: if there is > > > any problem during deletion, it will be logged, but the job will not > > > fail. > > > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch > > > eckpoints/#directory-structure > > > [2] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c > > > onfig/#state-checkpoints-num-retained > > > [3] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre > > > am/operators/overview/#disable-chaining > > > > > > Regards, > > > Roman > > > > > > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa > > > <alexis.sarda-espin...@microfocus.com> wrote: > > > > > > > > Hi Roman, > > > > > > > > Maybe I'm misunderstanding the structure of the data within the > > > > checkpoint. You suggest comparing counts of objects in different > > > > checkpoints, I assume you mean copying my "checkpoints" folder at > > > > different times and comparing, not comparing different "chk-*" folders > > > > in the same snapshot, right? > > > > > > > > I haven't executed the processor program with a newer checkpoint, but I > > > > did look at the folder in the running system, and I noticed that most > > > > of the chk-* folders have remained unchanged, there's only 1 or 2 new > > > > folders corresponding to newer checkpoints. I would think this makes > > > > sense since the configuration specifies that only 1 completed > > > > checkpoint should be retained, but then why are the older chk-* folders > > > > still there? I did trigger a manual restart of the Flink cluster in the > > > > past (before starting the long-running test), but if my policy is to > > > > CLAIM the checkpoint, Flink's documentation states that it would be > > > > cleaned eventually. > > > > > > > > Moreover, just by looking at folder sizes with "du", I can see that > > > > most of the state is held in the "shared" folder, and that has grown > > > > for sure; I'm not sure what "shared" usually holds, but if that's > > > > what's growing, maybe I can rule out expired state staying around?. My > > > > pipeline doesn't use timers, although I guess Flink itself may use > > > > them. Is there any way I could get some insight into which operator > > > > holds larger states? > > > > > > > > Regards, > > > > Alexis. > > > > > > > > -----Original Message----- > > > > From: Roman Khachatryan <ro...@apache.org> > > > > Sent: Dienstag, 12. April 2022 12:37 > > > > To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > > > > Cc: user@flink.apache.org > > > > Subject: Re: RocksDB's state size discrepancy with what's seen with > > > > state processor API > > > > > > > > Hi Alexis, > > > > > > > > Thanks a lot for sharing this. I think the program is correct. > > > > Although it doesn't take timers into account; and to estimate the state > > > > size more accurately, you could also use the same serializers used by > > > > the job. > > > > But maybe it makes more sense to compare the counts of objects in > > > > different checkpoints and see which state is growing. > > > > > > > > If the number of keys is small, compaction should eventually clean up > > > > the old values, given that the windows eventually expire. I think it > > > > makes sense to check that watermarks in all windows are making progress. > > > > > > > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the > > > > results of the State Processor program. > > > > > > > > Regards, > > > > Roman > > > > > > > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa > > > > <alexis.sarda-espin...@microfocus.com> wrote: > > > > > > > > > > Some additional information that I’ve gathered: > > > > > > > > > > > > > > > > > > > > The number of unique keys in the system is 10, and that is correctly > > > > > reflected in the state. > > > > > TTL for global window state is set to update on read and write, but > > > > > the code has logic to remove old state based on event time. > > > > > Not sure it’s relevant, but the Flink cluster does run with jemalloc > > > > > enabled. > > > > > GitHub gist with the whole processor setup since it’s not too long: > > > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678 > > > > > > > > > > > > > > > > > > > > Relevant configuration entries (explicitly set, others are left with > > > > > defaults): > > > > > > > > > > > > > > > > > > > > state.backend: rocksdb > > > > > > > > > > state.backend.incremental: true > > > > > > > > > > execution.checkpointing.interval: 30 s > > > > > > > > > > execution.checkpointing.min-pause: 25 s > > > > > > > > > > execution.checkpointing.timeout: 5 min > > > > > > > > > > execution.savepoint-restore-mode: CLAIM > > > > > > > > > > execution.checkpointing.externalized-checkpoint-retention: > > > > > RETAIN_ON_CANCELLATION > > > > > > > > > > > > > > > > > > > > Over the weekend, state size has grown to 1.23GB with the operators > > > > > referenced in the processor program taking 849MB, so I’m still pretty > > > > > puzzled. I thought it could be due to expired state being retained, > > > > > but I think that doesn’t make sense if I have finite keys, right? > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Alexis. > > > > > > > > > > > > > > > > > > > > From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > > > > > Sent: Samstag, 9. April 2022 01:39 > > > > > To: ro...@apache.org > > > > > Cc: user@flink.apache.org > > > > > Subject: Re: RocksDB's state size discrepancy with what's seen > > > > > with state processor API > > > > > > > > > > > > > > > > > > > > Hi Roman, > > > > > > > > > > > > > > > > > > > > Here's an example of a WindowReaderFunction: > > > > > > > > > > > > > > > > > > > > public class StateReaderFunction extends > > > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> { > > > > > > > > > > private static final ListStateDescriptor<Integer> LSD = > > > > > new ListStateDescriptor<>( > > > > > > > > > > "descriptorId", > > > > > > > > > > Integer.class > > > > > > > > > > ); > > > > > > > > > > > > > > > > > > > > @Override > > > > > > > > > > public void readWindow(String s, Context<TimeWindow> > > > > > context, Iterable<Pojo> elements, Collector<Integer> out) throws > > > > > Exception { > > > > > > > > > > int count = 0; > > > > > > > > > > for (Integer i : > > > > > context.windowState().getListState(LSD).get()) { > > > > > > > > > > count++; > > > > > > > > > > } > > > > > > > > > > out.collect(count); > > > > > > > > > > } > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > That's for the operator that uses window state. The other readers do > > > > > something similar but with context.globalState(). That should provide > > > > > the number of state entries for each key+window combination, no? And > > > > > after collecting all results, I would get the number of state entries > > > > > across all keys+windows for an operator. > > > > > > > > > > > > > > > > > > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call > > > > > context.windowState().getListState(...).clear(). > > > > > > > > > > > > > > > > > > > > Side note: in the state processor program I call > > > > > ExecutionEnvironment#setParallelism(1) even though my streaming job > > > > > runs with parallelism=4, this doesn't affect the result, does it? > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Alexis. > > > > > > > > > > > > > > > > > > > > ________________________________ > > > > > > > > > > From: Roman Khachatryan <ro...@apache.org> > > > > > Sent: Friday, April 8, 2022 11:06 PM > > > > > To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > > > > > Cc: user@flink.apache.org <user@flink.apache.org> > > > > > Subject: Re: RocksDB's state size discrepancy with what's seen > > > > > with state processor API > > > > > > > > > > > > > > > > > > > > Hi Alexis, > > > > > > > > > > If I understand correctly, the provided StateProcessor program > > > > > gives you the number of stream elements per operator. However, you > > > > > mentioned that these operators have collection-type states > > > > > (ListState and MapState). That means that per one entry there can > > > > > be an arbitrary number of state elements. > > > > > > > > > > Have you tried estimating the state sizes directly via > > > > > readKeyedState[1]? > > > > > > > > > > > The other operator does override and call clear() > > > > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right? > > > > > > > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/jav > > > > > a/or > > > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-jav > > > > > a.la > > > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReaderFun > > > > > ctio > > > > > n- > > > > > > > > > > [2] > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java > > > > > /org > > > > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction. > > > > > html#clear-org.apache.flink.streaming.api.functions.windowing.Proc > > > > > essW > > > > > indowFunction.Context- > > > > > > > > > > Regards, > > > > > Roman > > > > > > > > > > > > > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa > > > > > <alexis.sarda-espin...@microfocus.com> wrote: > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > > > > > > > I have a streaming job running on Flink 1.14.4 that uses managed > > > > > > state with RocksDB with incremental checkpoints as backend. I’ve > > > > > > been monitoring a dev environment that has been running for the > > > > > > last week and I noticed that state size and end-to-end duration > > > > > > have been increasing steadily. Currently, duration is 11 seconds > > > > > > and size is 917MB (as shown in the UI). The tasks with the largest > > > > > > state (614MB) come from keyed sliding windows. Some attributes of > > > > > > this job’s setup: > > > > > > > > > > > > > > > > > > > > > > > > Windows are 11 minutes in size. > > > > > > Slide time is 1 minute. > > > > > > Throughput is approximately 20 events per minute. > > > > > > > > > > > > > > > > > > > > > > > > I have 3 operators with these states: > > > > > > > > > > > > > > > > > > > > > > > > Window state with ListState<Integer> and no TTL. > > > > > > Global window state with MapState<Long, List<String>> and a TTL of > > > > > > 1 hour (with cleanupInRocksdbCompactFilter(1000L)). > > > > > > Global window state with ListState<Pojo> where the Pojo has an int > > > > > > and a long, a TTL of 1 hour, and configured with > > > > > > cleanupInRocksdbCompactFilter(1000L) as well. > > > > > > > > > > > > > > > > > > > > > > > > Both operators with global window state have logic to manually > > > > > > remove old state in addition to configured TTL. The other operator > > > > > > does override and call clear(). > > > > > > > > > > > > > > > > > > > > > > > > I have now analyzed the checkpoint folder with the state processor > > > > > > API, and I’ll note here that I see 50 folders named chk-*** even > > > > > > though I don’t set state.checkpoints.num-retained and the default > > > > > > should be 1. I loaded the data from the folder with the highest chk > > > > > > number and I see that my operators have these amounts respectively: > > > > > > > > > > > > > > > > > > > > > > > > 10 entries > > > > > > 80 entries > > > > > > 200 entries > > > > > > > > > > > > > > > > > > > > > > > > I got those numbers with something like this: > > > > > > > > > > > > > > > > > > > > > > > > savepoint > > > > > > > > > > > > .window(SlidingEventTimeWindows.of(Time.minutes(11L), > > > > > > Time.minutes(1L))) > > > > > > > > > > > > .process(...) > > > > > > > > > > > > .collect() > > > > > > > > > > > > .parallelStream() > > > > > > > > > > > > .reduce(0, Integer::sum); > > > > > > > > > > > > > > > > > > > > > > > > Where my WindowReaderFunction classes just count the number of > > > > > > entries in each call to readWindow. > > > > > > > > > > > > > > > > > > > > > > > > Those amounts cannot possibly account for 614MB, so what am I > > > > > > missing? > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > Alexis. > > > > > > > > > > > >