flink版本:1.14
目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。
在主程序中,我设置了状态过期策略:
   SingleOutputStreamOperator<AdvertiseClick> baiduStream = 
env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, 
AdvertiseClick.class)).name("BaiDuAdClick");
MapStateDescriptor<String, AdvertiseClick> baiduInfoMap = new 
MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class);
StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.cleanupIncrementally(200, true)
.build();
baiduInfoMap.enableTimeToLive(ttlConfig);
在BroadcastProcessFunction中,我也设置了状态清除策略:
public void open(Configuration parameters) throws Exception {
jedisClusterSink = Ad_MyRedisUtil.getJedisClient();
baiduInfoDesc = new MapStateDescriptor<String, AdvertiseClick>("advertiseInfo", 
String.class, AdvertiseClick.class);
StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.cleanupIncrementally(200, true)
.build();
baiduInfoDesc.enableTimeToLive(ttlConfig);

}
但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。


https://pic2.imgdb.cn/item/64619fef0d2dde577774d4c6.jpg




我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。

回复