Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里
<https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl>对
State TTL 的描述;

On Mon, May 15, 2023 at 11:05 AM lxk <lxk7...@163.com> wrote:

> flink版本:1.14
> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
> 在主程序中,我设置了状态过期策略:
>    SingleOutputStreamOperator<AdvertiseClick> baiduStream =
> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data,
> AdvertiseClick.class)).name("BaiDuAdClick");
> MapStateDescriptor<String, AdvertiseClick> baiduInfoMap = new
> MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class);
> StateTtlConfig ttlConfig = StateTtlConfig
>                 .newBuilder(Time.days(7))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupFullSnapshot()
> .cleanupIncrementally(200, true)
> .build();
> baiduInfoMap.enableTimeToLive(ttlConfig);
> 在BroadcastProcessFunction中,我也设置了状态清除策略:
> public void open(Configuration parameters) throws Exception {
> jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
> baiduInfoDesc = new MapStateDescriptor<String,
> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class);
> StateTtlConfig ttlConfig = StateTtlConfig
>                 .newBuilder(Time.days(7))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .cleanupFullSnapshot()
> .cleanupIncrementally(200, true)
> .build();
> baiduInfoDesc.enableTimeToLive(ttlConfig);
>
> }
> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。
>
>
> https://pic2.imgdb.cn/item/64619fef0d2dde577774d4c6.jpg
>
>
>
>
> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。



-- 
Best,
Hangxiang.

回复