按我的理解,参考aggregate(AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction)方法,
窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。

x <[email protected]> 于2020年8月25日周二 下午6:25写道:

>
> 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)……………
> .window(TumblingEventTimeWindows.of(Time.days(1)))
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
> .evictor(TimeEvictor.of(Time.seconds(0), true))
> .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
> private var state: MapState[String,Boolean] = _
> override def open
> override def process
> override def clear(ctx: Context): Unit = {
>             state.clear()
> }
> }

回复