Hi David, If I read Stephan's comment correctly TTL doesn't work well for cases where we have too many levels, like fast growing state, as compaction doesn't clean up high level SST files in time, Is this correct? If yes should we register a timer with TTL time and manual clean up the state (state.clear() ) when the timer fires?
I will turn on RocksDB logging as well as compaction logging [1] to verify this [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction On Tue, Sep 14, 2021 at 5:38 PM David Morávek <d...@apache.org> wrote: > Hi Tao, > > my intuition is that the compaction of SST files is not triggering. By > default, it's only triggered by the size ratios of different levels [1] and > the TTL mechanism has no effect on it. > > Some reasoning from Stephan: > > It's very likely to have large files in higher levels that haven't been >> compacted in a long time and thus just stay around. >> >> This might be especially possible if you insert a lot in the beginning >> (build up many levels) and then have a moderate rate of modifications, so >> the changes and expiration keep happening purely in the merges / >> compactions of the first levels. Then the later levels may stay unchanged >> for quite some time. >> > > You should be able to see compaction details by setting RocksDB logging to > INFO [2]. Can you please check these and validate whether this really is > the case? > > [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction > [2] > https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting > > Best, > D. > > On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote: > >> Hi team >> >> We have a job that uses value state with RocksDB and TTL set to 1 day. >> The TTL update type is OnCreateAndWrite. We set the value state when the >> value state doesn't exist and we never update it again after the state is >> not empty. The key of the value state is timestamp. My understanding of >> such TTL settings is that the size of all SST files remains flat (let's >> disregard the impact space amplification brings) after 1 day as the daily >> data volume is more or less the same. However the RocksDB native metrics >> show that the SST files continue to grow since I started the job. I check >> the SST files in local storage and I can see SST files with age 1 months >> ago (when I started the job). What is the possible reason for the SST files >> not cleaned up?. >> >> The Flink version is 1.12.1 >> State backend is RocksDB with incremental checkpoint >> All default configuration for RocksDB >> Per job mode in Yarn and checkpoint to S3 >> >> >> Here is the code to set value state >> >> public void open(Configuration parameters) { >> StateTtlConfig ttlConfigClick = StateTtlConfig >> .newBuilder(Time.days(1)) >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> .cleanupInRocksdbCompactFilter(300_000) >> .build(); >> ValueStateDescriptor<Click> clickStateDescriptor = new >> ValueStateDescriptor<>("click", Click.class); >> clickStateDescriptor.enableTimeToLive(ttlConfigClick); >> clickState = getRuntimeContext().getState(clickStateDescriptor); >> >> StateTtlConfig ttlConfigAds = StateTtlConfig >> .newBuilder(Time.days(1)) >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> .cleanupInRocksdbCompactFilter(30_000_000) >> .build(); >> ValueStateDescriptor<A> adsStateDescriptor = new >> ValueStateDescriptor<>("ads", slimAdsClass); >> adsStateDescriptor.enableTimeToLive(ttlConfigAds); >> adsState = getRuntimeContext().getState(adsStateDescriptor); >> } >> >> @Override >> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, >> Collector<A> collector) throws Exception { >> if (tuple.f1 != null) { >> Click click = tuple.f1; >> >> if (clickState.value() != null) { >> return; >> } >> >> clickState.update(click); >> >> A adsFromState = adsState.value(); >> if (adsFromState != null) { >> collector.collect(adsFromState); >> } >> } else { >> A ads = tuple.f2; >> >> if (adsState.value() != null) { >> return; >> } >> >> adsState.update(ads); >> >> Click clickFromState = clickState.value(); >> if (clickFromState != null) { >> collector.collect(ads); >> } >> } >> } >> >> >> Here is the snippet of sst files in local storage >> >> [root@xxxx db]# ll | head -n10 >> total 76040068 >> -rw-r----- 1 hadoop yarn 0 Aug 16 08:46 000003.log >> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst >> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst >> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst >> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst >> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst >> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst >> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst >> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst >> -- >> Regards, >> Tao >> > -- Regards, Tao