Hi Jack,
right now this is not possible except when writing a custom operator. We
are working on support for a time-to-live setting on states, this should
solve your problem.
For writing a custom operator, check out DataStream.transform() and
StreamMap, which is the operator implementation for Map
Hi all,
I have an incoming stream of event objects, each with its session ID. I am
writing a task that aggregate the events by session. The general logics
looks like
case class Event(sessionId:Int, data:String)case class Session(id:Int,
var events:List[Event])
val events = ... //some source
event