Hi Jack, right now this is not possible except when writing a custom operator. We are working on support for a time-to-live setting on states, this should solve your problem.
For writing a custom operator, check out DataStream.transform() and StreamMap, which is the operator implementation for Map. Please let me know if you have any further questions. Best, Aljoscha On Tue, 7 Jun 2016 at 03:05 Jack Huang <jackhu...@mz.com> wrote: > Hi all, > > I have an incoming stream of event objects, each with its session ID. I am > writing a task that aggregate the events by session. The general logics > looks like > > case class Event(sessionId:Int, data:String)case class Session(id:Int, var > events:List[Event]) > val events = ... //some source > events. > .keyBy((event:Event) => event.sessionId) > .mapWithState((event:Event, state:Option[Session]) => { > val session = state.getOrElse(Session(id=event.session_id, events=List())) > session.event = session.event :+ event > (session, Some(session)) > }) > > The problem is that there is no reliable way of knowing the end of a > session, since events are likely to get lost. If I keep this process > running, the number of stored sessions will keep growing until it fills up > the disk. > > Is there a recommended way of periodically evicting sessions that are too > old (e.g. a day old)? > > > > Thanks, > Jack >