Thanks for the feedback! However TTL already proves that the state cannot be cleaned up on time due to too many levels built up in RocksDB.
Hi @Yun Tang <myas...@live.com> do you have any suggestions to tune RocksDB to accelerate the compaction progress? On Fri, Sep 17, 2021 at 8:01 PM David Morávek <d...@apache.org> wrote: > Cleaning up with timers should solve this. Both approaches have some > advantages and disadvantages though. > > Timers: > - No "side effects". > - Can be set in event time. Deletes are regular tombstones that will get > compacted later on. > > TTL: > - Performance. This costs literally nothing compared to an extra state for > timer + writing a tombstone marker. > - Has "side-effects", because it works in processing time. This is just > something to keep in mind eg. when bootstraping the state from historical > data. (large event time / processing time skew) > > With 1.14 release, we've bumped the RocksDB version so it may be possible > to use a "periodic compaction" [1], but nobody has tried that so far. In > the meantime I think there is non real workaround because we don't expose a > way to trigger manual compaction. > > I'm off to vacation until 27th and I won't be responsive during that time. > I'd like to pull Yun into the conversation as he's super familiar with the > RocksDB state backend. > > [1] > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction > > Best, > D. > > On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xiaotao...@gmail.com> wrote: > >> Hi David, >> >> Confirmed with RocksDB log Stephan's observation is the root cause that >> compaction doesn't clean up the high level sst files fast enough. Do you >> think manual clean up by registering a timer is the way to go or any >> RocksDB parameter can be tuned to mitigate this issue? >> >> On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xiaotao...@gmail.com> wrote: >> >>> Hi David, >>> >>> If I read Stephan's comment correctly TTL doesn't work well for cases >>> where we have too many levels, like fast growing state, as compaction >>> doesn't clean up high level SST files in time, Is this correct? If yes >>> should we register a timer with TTL time and manual clean up the state >>> (state.clear() ) when the timer fires? >>> >>> I will turn on RocksDB logging as well as compaction logging [1] to >>> verify this >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction >>> >>> >>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <d...@apache.org> wrote: >>> >>>> Hi Tao, >>>> >>>> my intuition is that the compaction of SST files is not triggering. By >>>> default, it's only triggered by the size ratios of different levels [1] and >>>> the TTL mechanism has no effect on it. >>>> >>>> Some reasoning from Stephan: >>>> >>>> It's very likely to have large files in higher levels that haven't been >>>>> compacted in a long time and thus just stay around. >>>>> >>>>> This might be especially possible if you insert a lot in the beginning >>>>> (build up many levels) and then have a moderate rate of modifications, so >>>>> the changes and expiration keep happening purely in the merges / >>>>> compactions of the first levels. Then the later levels may stay unchanged >>>>> for quite some time. >>>>> >>>> >>>> You should be able to see compaction details by setting RocksDB logging >>>> to INFO [2]. Can you please check these and validate whether this really is >>>> the case? >>>> >>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction >>>> [2] >>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting >>>> >>>> Best, >>>> D. >>>> >>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote: >>>> >>>>> Hi team >>>>> >>>>> We have a job that uses value state with RocksDB and TTL set to 1 day. >>>>> The TTL update type is OnCreateAndWrite. We set the value state when the >>>>> value state doesn't exist and we never update it again after the state is >>>>> not empty. The key of the value state is timestamp. My understanding of >>>>> such TTL settings is that the size of all SST files remains flat (let's >>>>> disregard the impact space amplification brings) after 1 day as the daily >>>>> data volume is more or less the same. However the RocksDB native metrics >>>>> show that the SST files continue to grow since I started the job. I check >>>>> the SST files in local storage and I can see SST files with age 1 months >>>>> ago (when I started the job). What is the possible reason for the SST >>>>> files >>>>> not cleaned up?. >>>>> >>>>> The Flink version is 1.12.1 >>>>> State backend is RocksDB with incremental checkpoint >>>>> All default configuration for RocksDB >>>>> Per job mode in Yarn and checkpoint to S3 >>>>> >>>>> >>>>> Here is the code to set value state >>>>> >>>>> public void open(Configuration parameters) { >>>>> StateTtlConfig ttlConfigClick = StateTtlConfig >>>>> .newBuilder(Time.days(1)) >>>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>>>> >>>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>>>> .cleanupInRocksdbCompactFilter(300_000) >>>>> .build(); >>>>> ValueStateDescriptor<Click> clickStateDescriptor = new >>>>> ValueStateDescriptor<>("click", Click.class); >>>>> clickStateDescriptor.enableTimeToLive(ttlConfigClick); >>>>> clickState = getRuntimeContext().getState(clickStateDescriptor); >>>>> >>>>> StateTtlConfig ttlConfigAds = StateTtlConfig >>>>> .newBuilder(Time.days(1)) >>>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>>>> >>>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>>>> .cleanupInRocksdbCompactFilter(30_000_000) >>>>> .build(); >>>>> ValueStateDescriptor<A> adsStateDescriptor = new >>>>> ValueStateDescriptor<>("ads", slimAdsClass); >>>>> adsStateDescriptor.enableTimeToLive(ttlConfigAds); >>>>> adsState = getRuntimeContext().getState(adsStateDescriptor); >>>>> } >>>>> >>>>> @Override >>>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, >>>>> Collector<A> collector) throws Exception { >>>>> if (tuple.f1 != null) { >>>>> Click click = tuple.f1; >>>>> >>>>> if (clickState.value() != null) { >>>>> return; >>>>> } >>>>> >>>>> clickState.update(click); >>>>> >>>>> A adsFromState = adsState.value(); >>>>> if (adsFromState != null) { >>>>> collector.collect(adsFromState); >>>>> } >>>>> } else { >>>>> A ads = tuple.f2; >>>>> >>>>> if (adsState.value() != null) { >>>>> return; >>>>> } >>>>> >>>>> adsState.update(ads); >>>>> >>>>> Click clickFromState = clickState.value(); >>>>> if (clickFromState != null) { >>>>> collector.collect(ads); >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> Here is the snippet of sst files in local storage >>>>> >>>>> [root@xxxx db]# ll | head -n10 >>>>> total 76040068 >>>>> -rw-r----- 1 hadoop yarn 0 Aug 16 08:46 000003.log >>>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst >>>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst >>>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst >>>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst >>>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst >>>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst >>>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst >>>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst >>>>> -- >>>>> Regards, >>>>> Tao >>>>> >>>> >>> >>> -- >>> Regards, >>> Tao >>> >> >> >> -- >> Regards, >> Tao >> > -- Regards, Tao