This works with event-time as well. You need to set the right
TimeCharacteristics on the exec env and assign timestamps + watermarks. The
only time depended operation is the window. YourWindowFunction assigns the
timestamp of the window. WindowFunction.apply() has a TimeWindow parameter
that gives
I'm working with your suggestions, thank you very much. What I'm missing
here is what YourWindowFunction should do. I have no notion of event time
there and so I can't assign a timestamp. Also this solution seems to be
working by processing time, while I care about event time. I couldn't make
it ru
Hi Simone,
I think I have a solution for your problem:
val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
.keyBy(_._1) // key by id
.flatMap(new StateUpdater) // StateUpdater is a stateful FlatMapFunction.
It has a