Hi all, I have an incoming stream of event objects, each with its session ID. I am writing a task that aggregate the events by session. The general logics looks like
case class Event(sessionId:Int, data:String)case class Session(id:Int, var events:List[Event]) val events = ... //some source events. .keyBy((event:Event) => event.sessionId) .mapWithState((event:Event, state:Option[Session]) => { val session = state.getOrElse(Session(id=event.session_id, events=List())) session.event = session.event :+ event (session, Some(session)) }) The problem is that there is no reliable way of knowing the end of a session, since events are likely to get lost. If I keep this process running, the number of stored sessions will keep growing until it fills up the disk. Is there a recommended way of periodically evicting sessions that are too old (e.g. a day old)? Thanks, Jack