Thanks for the feedback! However TTL already proves that the state cannot
be cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang <myas...@live.com> do you have any suggestions to tune RocksDB
to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek <d...@apache.org> wrote:

> Cleaning up with timers should solve this. Both approaches have some
> advantages and disadvantages though.
>
> Timers:
> - No "side effects".
> - Can be set in event time. Deletes are regular tombstones that will get
> compacted later on.
>
> TTL:
> - Performance. This costs literally nothing compared to an extra state for
> timer + writing a tombstone marker.
> - Has "side-effects", because it works in processing time. This is just
> something to keep in mind eg. when bootstraping the state from historical
> data. (large event time / processing time skew)
>
> With 1.14 release, we've bumped the RocksDB version so it may be possible
> to use a "periodic compaction" [1], but nobody has tried that so far. In
> the meantime I think there is non real workaround because we don't expose a
> way to trigger manual compaction.
>
> I'm off to vacation until 27th and I won't be responsive during that time.
> I'd like to pull Yun into the conversation as he's super familiar with the
> RocksDB state backend.
>
> [1]
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction
>
> Best,
> D.
>
> On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xiaotao...@gmail.com> wrote:
>
>> Hi David,
>>
>> Confirmed with RocksDB log Stephan's observation is the root cause that
>> compaction doesn't clean up the high level sst files fast enough.  Do you
>> think manual clean up by registering a timer is the way to go or any
>> RocksDB parameter can be tuned to mitigate this issue?
>>
>> On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xiaotao...@gmail.com> wrote:
>>
>>> Hi David,
>>>
>>> If I read Stephan's comment correctly TTL doesn't work well for cases
>>> where we have too many levels, like fast growing state,  as compaction
>>> doesn't clean up high level SST files in time, Is this correct? If yes
>>> should we register a timer with TTL time and manual clean up the state
>>> (state.clear() ) when the timer fires?
>>>
>>> I will turn on RocksDB logging as well as compaction logging [1] to
>>> verify this
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>>
>>>
>>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <d...@apache.org> wrote:
>>>
>>>> Hi Tao,
>>>>
>>>> my intuition is that the compaction of SST files is not triggering. By
>>>> default, it's only triggered by the size ratios of different levels [1] and
>>>> the TTL mechanism has no effect on it.
>>>>
>>>> Some reasoning from Stephan:
>>>>
>>>> It's very likely to have large files in higher levels that haven't been
>>>>> compacted in a long time and thus just stay around.
>>>>>
>>>>> This might be especially possible if you insert a lot in the beginning
>>>>> (build up many levels) and then have a moderate rate of modifications, so
>>>>> the changes and expiration keep happening purely in the merges /
>>>>> compactions of the first levels. Then the later levels may stay unchanged
>>>>> for quite some time.
>>>>>
>>>>
>>>> You should be able to see compaction details by setting RocksDB logging
>>>> to INFO [2]. Can you please check these and validate whether this really is
>>>> the case?
>>>>
>>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>>>> [2]
>>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xiaotao...@gmail.com> wrote:
>>>>
>>>>> Hi team
>>>>>
>>>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>>>> value state doesn't exist and we never update it again after the state is
>>>>> not empty. The key of the value state is timestamp. My understanding of
>>>>> such TTL settings is that the size of all SST files remains flat (let's
>>>>> disregard the impact space amplification brings) after 1 day as the daily
>>>>> data volume is more or less the same. However the RocksDB native metrics
>>>>> show that the SST files continue to grow since I started the job. I check
>>>>> the SST files in local storage and I can see SST files with age 1 months
>>>>> ago (when I started the job). What is the possible reason for the SST 
>>>>> files
>>>>> not cleaned up?.
>>>>>
>>>>> The Flink version is 1.12.1
>>>>> State backend is RocksDB with incremental checkpoint
>>>>> All default configuration for RocksDB
>>>>> Per job mode in Yarn and checkpoint to S3
>>>>>
>>>>>
>>>>> Here is the code to set value state
>>>>>
>>>>> public void open(Configuration parameters) {
>>>>>     StateTtlConfig ttlConfigClick = StateTtlConfig
>>>>>             .newBuilder(Time.days(1))
>>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>>             
>>>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>>             .cleanupInRocksdbCompactFilter(300_000)
>>>>>             .build();
>>>>>     ValueStateDescriptor<Click> clickStateDescriptor = new 
>>>>> ValueStateDescriptor<>("click", Click.class);
>>>>>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>>>>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>>>
>>>>>     StateTtlConfig ttlConfigAds = StateTtlConfig
>>>>>             .newBuilder(Time.days(1))
>>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>>             
>>>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>>             .cleanupInRocksdbCompactFilter(30_000_000)
>>>>>             .build();
>>>>>     ValueStateDescriptor<A> adsStateDescriptor = new 
>>>>> ValueStateDescriptor<>("ads", slimAdsClass);
>>>>>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>>>>     adsState = getRuntimeContext().getState(adsStateDescriptor);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, 
>>>>> Collector<A> collector) throws Exception {
>>>>>     if (tuple.f1 != null) {
>>>>>         Click click = tuple.f1;
>>>>>
>>>>>         if (clickState.value() != null) {
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         clickState.update(click);
>>>>>
>>>>>         A adsFromState = adsState.value();
>>>>>         if (adsFromState != null) {
>>>>>             collector.collect(adsFromState);
>>>>>         }
>>>>>     } else {
>>>>>         A ads = tuple.f2;
>>>>>
>>>>>         if (adsState.value() != null) {
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         adsState.update(ads);
>>>>>
>>>>>         Click clickFromState = clickState.value();
>>>>>         if (clickFromState != null) {
>>>>>             collector.collect(ads);
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> Here is the snippet of sst files in local storage
>>>>>
>>>>> [root@xxxx db]# ll | head -n10
>>>>> total 76040068
>>>>> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
>>>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>>>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
>>>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
>>>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
>>>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
>>>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
>>>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
>>>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
>>>>> --
>>>>> Regards,
>>>>> Tao
>>>>>
>>>>
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao

Reply via email to