Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that
compaction doesn't clean up the high level sst files fast enough.  Do you
think manual clean up by registering a timer is the way to go or any
RocksDB parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xiaotao...@gmail.com> wrote:

> Hi David,
>
> If I read Stephan's comment correctly TTL doesn't work well for cases
> where we have too many levels, like fast growing state,  as compaction
> doesn't clean up high level SST files in time, Is this correct? If yes
> should we register a timer with TTL time and manual clean up the state
> (state.clear() ) when the timer fires?
>
> I will turn on RocksDB logging as well as compaction logging [1] to verify
> this
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>
>
> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <d...@apache.org> wrote:
>
>> Hi Tao,
>>
>> my intuition is that the compaction of SST files is not triggering. By
>> default, it's only triggered by the size ratios of different levels [1] and
>> the TTL mechanism has no effect on it.
>>
>> Some reasoning from Stephan:
>>
>> It's very likely to have large files in higher levels that haven't been
>>> compacted in a long time and thus just stay around.
>>>
>>> This might be especially possible if you insert a lot in the beginning
>>> (build up many levels) and then have a moderate rate of modifications, so
>>> the changes and expiration keep happening purely in the merges /
>>> compactions of the first levels. Then the later levels may stay unchanged
>>> for quite some time.
>>>
>>
>> You should be able to see compaction details by setting RocksDB logging
>> to INFO [2]. Can you please check these and validate whether this really is
>> the case?
>>
>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>> [2]
>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>
>> Best,
>> D.
>>
>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote:
>>
>>> Hi team
>>>
>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>> value state doesn't exist and we never update it again after the state is
>>> not empty. The key of the value state is timestamp. My understanding of
>>> such TTL settings is that the size of all SST files remains flat (let's
>>> disregard the impact space amplification brings) after 1 day as the daily
>>> data volume is more or less the same. However the RocksDB native metrics
>>> show that the SST files continue to grow since I started the job. I check
>>> the SST files in local storage and I can see SST files with age 1 months
>>> ago (when I started the job). What is the possible reason for the SST files
>>> not cleaned up?.
>>>
>>> The Flink version is 1.12.1
>>> State backend is RocksDB with incremental checkpoint
>>> All default configuration for RocksDB
>>> Per job mode in Yarn and checkpoint to S3
>>>
>>>
>>> Here is the code to set value state
>>>
>>> public void open(Configuration parameters) {
>>>     StateTtlConfig ttlConfigClick = StateTtlConfig
>>>             .newBuilder(Time.days(1))
>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>             
>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>             .cleanupInRocksdbCompactFilter(300_000)
>>>             .build();
>>>     ValueStateDescriptor<Click> clickStateDescriptor = new 
>>> ValueStateDescriptor<>("click", Click.class);
>>>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>
>>>     StateTtlConfig ttlConfigAds = StateTtlConfig
>>>             .newBuilder(Time.days(1))
>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>             
>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>             .cleanupInRocksdbCompactFilter(30_000_000)
>>>             .build();
>>>     ValueStateDescriptor<A> adsStateDescriptor = new 
>>> ValueStateDescriptor<>("ads", slimAdsClass);
>>>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>>     adsState = getRuntimeContext().getState(adsStateDescriptor);
>>> }
>>>
>>> @Override
>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, 
>>> Collector<A> collector) throws Exception {
>>>     if (tuple.f1 != null) {
>>>         Click click = tuple.f1;
>>>
>>>         if (clickState.value() != null) {
>>>             return;
>>>         }
>>>
>>>         clickState.update(click);
>>>
>>>         A adsFromState = adsState.value();
>>>         if (adsFromState != null) {
>>>             collector.collect(adsFromState);
>>>         }
>>>     } else {
>>>         A ads = tuple.f2;
>>>
>>>         if (adsState.value() != null) {
>>>             return;
>>>         }
>>>
>>>         adsState.update(ads);
>>>
>>>         Click clickFromState = clickState.value();
>>>         if (clickFromState != null) {
>>>             collector.collect(ads);
>>>         }
>>>     }
>>> }
>>>
>>>
>>> Here is the snippet of sst files in local storage
>>>
>>> [root@xxxx db]# ll | head -n10
>>> total 76040068
>>> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
>>> --
>>> Regards,
>>> Tao
>>>
>>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao

Reply via email to