按我的理解,参考aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)方法, 窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。
x <[email protected]> 于2020年8月25日周二 下午6:25写道: > > 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)…………… > .window(TumblingEventTimeWindows.of(Time.days(1))) > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) > .evictor(TimeEvictor.of(Time.seconds(0), true)) > .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{ > private var state: MapState[String,Boolean] = _ > override def open > override def process > override def clear(ctx: Context): Unit = { > state.clear() > } > }
