Hi Bart, Thank you so much for sharing the approach. Looks like this solved my problem. Here's what I have as an implementation for my use-case:
package org.apache.flink.quickstart import org.apache.flink.api.common.state.{ ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor } import org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext import org.apache.flink.streaming.api.windowing.triggers.{ Trigger, TriggerResult } import org.apache.flink.streaming.api.windowing.windows.Window import org.slf4j.LoggerFactory class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E, Window] { val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer", classOf[Option[Long]], None) override def onElement(t: E, l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = { // remove old timer val time_state: ValueState[Option[Long]] = triggerContext.getPartitionedState(timeState) val time_set = time_state.value() if (time_set.isDefined) { triggerContext.deleteProcessingTimeTimer(time_set.get) } // set new time and continue val new_time = triggerContext.getCurrentProcessingTime + Time.seconds(sessionPauseHours).toMilliseconds() time_state.update(Some(new_time)) triggerContext.registerProcessingTimeTimer(new_time) TriggerResult.FIRE } override def onProcessingTime(l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = { TriggerResult.PURGE } override def onEventTime(l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = { TriggerResult.CONTINUE } } Regards, Anchit -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9676.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.