Hi David, Confirmed with RocksDB log Stephan's observation is the root cause that compaction doesn't clean up the high level sst files fast enough. Do you think manual clean up by registering a timer is the way to go or any RocksDB parameter can be tuned to mitigate this issue?
On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xiaotao...@gmail.com> wrote: > Hi David, > > If I read Stephan's comment correctly TTL doesn't work well for cases > where we have too many levels, like fast growing state, as compaction > doesn't clean up high level SST files in time, Is this correct? If yes > should we register a timer with TTL time and manual clean up the state > (state.clear() ) when the timer fires? > > I will turn on RocksDB logging as well as compaction logging [1] to verify > this > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction > > > On Tue, Sep 14, 2021 at 5:38 PM David Morávek <d...@apache.org> wrote: > >> Hi Tao, >> >> my intuition is that the compaction of SST files is not triggering. By >> default, it's only triggered by the size ratios of different levels [1] and >> the TTL mechanism has no effect on it. >> >> Some reasoning from Stephan: >> >> It's very likely to have large files in higher levels that haven't been >>> compacted in a long time and thus just stay around. >>> >>> This might be especially possible if you insert a lot in the beginning >>> (build up many levels) and then have a moderate rate of modifications, so >>> the changes and expiration keep happening purely in the merges / >>> compactions of the first levels. Then the later levels may stay unchanged >>> for quite some time. >>> >> >> You should be able to see compaction details by setting RocksDB logging >> to INFO [2]. Can you please check these and validate whether this really is >> the case? >> >> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction >> [2] >> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting >> >> Best, >> D. >> >> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote: >> >>> Hi team >>> >>> We have a job that uses value state with RocksDB and TTL set to 1 day. >>> The TTL update type is OnCreateAndWrite. We set the value state when the >>> value state doesn't exist and we never update it again after the state is >>> not empty. The key of the value state is timestamp. My understanding of >>> such TTL settings is that the size of all SST files remains flat (let's >>> disregard the impact space amplification brings) after 1 day as the daily >>> data volume is more or less the same. However the RocksDB native metrics >>> show that the SST files continue to grow since I started the job. I check >>> the SST files in local storage and I can see SST files with age 1 months >>> ago (when I started the job). What is the possible reason for the SST files >>> not cleaned up?. >>> >>> The Flink version is 1.12.1 >>> State backend is RocksDB with incremental checkpoint >>> All default configuration for RocksDB >>> Per job mode in Yarn and checkpoint to S3 >>> >>> >>> Here is the code to set value state >>> >>> public void open(Configuration parameters) { >>> StateTtlConfig ttlConfigClick = StateTtlConfig >>> .newBuilder(Time.days(1)) >>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>> >>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>> .cleanupInRocksdbCompactFilter(300_000) >>> .build(); >>> ValueStateDescriptor<Click> clickStateDescriptor = new >>> ValueStateDescriptor<>("click", Click.class); >>> clickStateDescriptor.enableTimeToLive(ttlConfigClick); >>> clickState = getRuntimeContext().getState(clickStateDescriptor); >>> >>> StateTtlConfig ttlConfigAds = StateTtlConfig >>> .newBuilder(Time.days(1)) >>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>> >>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>> .cleanupInRocksdbCompactFilter(30_000_000) >>> .build(); >>> ValueStateDescriptor<A> adsStateDescriptor = new >>> ValueStateDescriptor<>("ads", slimAdsClass); >>> adsStateDescriptor.enableTimeToLive(ttlConfigAds); >>> adsState = getRuntimeContext().getState(adsStateDescriptor); >>> } >>> >>> @Override >>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, >>> Collector<A> collector) throws Exception { >>> if (tuple.f1 != null) { >>> Click click = tuple.f1; >>> >>> if (clickState.value() != null) { >>> return; >>> } >>> >>> clickState.update(click); >>> >>> A adsFromState = adsState.value(); >>> if (adsFromState != null) { >>> collector.collect(adsFromState); >>> } >>> } else { >>> A ads = tuple.f2; >>> >>> if (adsState.value() != null) { >>> return; >>> } >>> >>> adsState.update(ads); >>> >>> Click clickFromState = clickState.value(); >>> if (clickFromState != null) { >>> collector.collect(ads); >>> } >>> } >>> } >>> >>> >>> Here is the snippet of sst files in local storage >>> >>> [root@xxxx db]# ll | head -n10 >>> total 76040068 >>> -rw-r----- 1 hadoop yarn 0 Aug 16 08:46 000003.log >>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst >>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst >>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst >>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst >>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst >>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst >>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst >>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst >>> -- >>> Regards, >>> Tao >>> >> > > -- > Regards, > Tao > -- Regards, Tao