Hi all,

I have an incoming stream of event objects, each with its session ID. I am
writing a task that aggregate the events by session. The general logics
looks like

case class Event(sessionId:Int, data:String)case class Session(id:Int,
var events:List[Event])
val events = ... //some source
events.
.keyBy((event:Event) => event.sessionId)
.mapWithState((event:Event, state:Option[Session]) => {
    val session = state.getOrElse(Session(id=event.session_id, events=List()))
    session.event = session.event :+ event
    (session, Some(session))
})

The problem is that there is no reliable way of knowing the end of a
session, since events are likely to get lost. If I keep this process
running, the number of stored sessions will keep growing until it fills up
the disk.

Is there a recommended way of periodically evicting sessions that are too
old (e.g. a day old)?



Thanks,
Jack

Reply via email to