Hi Jack,
right now this is not possible except when writing a custom operator. We
are working on support for a time-to-live setting on states, this should
solve your problem.

For writing a custom operator, check out DataStream.transform() and
StreamMap, which is the operator implementation for Map. Please let me know
if you have any further questions.

Best,
Aljoscha

On Tue, 7 Jun 2016 at 03:05 Jack Huang <jackhu...@mz.com> wrote:

> Hi all,
>
> I have an incoming stream of event objects, each with its session ID. I am
> writing a task that aggregate the events by session. The general logics
> looks like
>
> case class Event(sessionId:Int, data:String)case class Session(id:Int, var 
> events:List[Event])
> val events = ... //some source
> events.
> .keyBy((event:Event) => event.sessionId)
> .mapWithState((event:Event, state:Option[Session]) => {
>     val session = state.getOrElse(Session(id=event.session_id, events=List()))
>     session.event = session.event :+ event
>     (session, Some(session))
> })
>
> The problem is that there is no reliable way of knowing the end of a
> session, since events are likely to get lost. If I keep this process
> running, the number of stored sessions will keep growing until it fills up
> the disk.
>
> Is there a recommended way of periodically evicting sessions that are too
> old (e.g. a day old)?
>
>
>
> Thanks,
> Jack
>

Reply via email to