Hi team

We have a job that uses value state with RocksDB and TTL set to 1 day. The
TTL update type is OnCreateAndWrite. We set the value state when the value
state doesn't exist and we never update it again after the state is not
empty. The key of the value state is timestamp. My understanding of such
TTL settings is that the size of all SST files remains flat (let's
disregard the impact space amplification brings) after 1 day as the daily
data volume is more or less the same. However the RocksDB native metrics
show that the SST files continue to grow since I started the job. I check
the SST files in local storage and I can see SST files with age 1 months
ago (when I started the job). What is the possible reason for the SST files
not cleaned up?.

The Flink version is 1.12.1
State backend is RocksDB with incremental checkpoint
All default configuration for RocksDB
Per job mode in Yarn and checkpoint to S3


Here is the code to set value state

public void open(Configuration parameters) {
    StateTtlConfig ttlConfigClick = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(300_000)
            .build();
    ValueStateDescriptor<Click> clickStateDescriptor = new
ValueStateDescriptor<>("click", Click.class);
    clickStateDescriptor.enableTimeToLive(ttlConfigClick);
    clickState = getRuntimeContext().getState(clickStateDescriptor);

    StateTtlConfig ttlConfigAds = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(30_000_000)
            .build();
    ValueStateDescriptor<A> adsStateDescriptor = new
ValueStateDescriptor<>("ads", slimAdsClass);
    adsStateDescriptor.enableTimeToLive(ttlConfigAds);
    adsState = getRuntimeContext().getState(adsStateDescriptor);
}

@Override
public void processElement(Tuple3<String, Click, A> tuple, Context
ctx, Collector<A> collector) throws Exception {
    if (tuple.f1 != null) {
        Click click = tuple.f1;

        if (clickState.value() != null) {
            return;
        }

        clickState.update(click);

        A adsFromState = adsState.value();
        if (adsFromState != null) {
            collector.collect(adsFromState);
        }
    } else {
        A ads = tuple.f2;

        if (adsState.value() != null) {
            return;
        }

        adsState.update(ads);

        Click clickFromState = clickState.value();
        if (clickFromState != null) {
            collector.collect(ads);
        }
    }
}


Here is the snippet of sst files in local storage

[root@xxxx db]# ll | head -n10
total 76040068
-rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
-rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
-rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
-rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
-rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
-rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
-rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
-rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
-rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
-- 
Regards,
Tao

Reply via email to