Here is a session trigger that I wrote (not quite the same rules around what a
session is, but should hopefully be a good start to work from).  I'd love to get
any feedback on how it could be improved.

- bart


import org.apache.flink.api.common.state.{ReducingState, 
ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window

import org.slf4j.LoggerFactory

/**
  *
  * @param maxSessionLength maximum number of pages that can be in a single 
session
  * @param sessionPauseMillis time duration in milliseconds indicating length 
of quiet after which a session has ended
  */
class sessionTrigger[E](val maxSessionLength: Int, val sessionPauseMillis: 
Long) extends Trigger[E, Window] {

  val logger = LoggerFactory.getLogger(classOf[sessionTrigger[E]])
  val countState = new ReducingStateDescriptor[Long]("sessionCount", new 
ScalaReduceFunction[Long](_ + _), classOf[Long])
  val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer", 
classOf[Option[Long]], None)

  override def onElement(t: E, l: Long, w: Window, triggerContext: 
TriggerContext): TriggerResult = {
    logger.trace("received: " + t.toString)

    // remove old timer
    val time_state: ValueState[Option[Long]] = 
triggerContext.getPartitionedState(timeState)
    val time_set = time_state.value()
    if (time_set.isDefined) {
      triggerContext.deleteProcessingTimeTimer(time_set.get)
    }


    // update count and check if over limit
    val state: ReducingState[Long] = 
triggerContext.getPartitionedState(countState)
    val ct = state.get()
    logger.trace("count: " + ct + " : " + t.toString)
    state.add(1)
    if (ct > maxSessionLength) {
      logger.trace("triggered by count")
      time_state.update(None)
      TriggerResult.FIRE_AND_PURGE
    } else {
      // set new time and continue
      val new_time = triggerContext.getCurrentProcessingTime + 2000
      time_state.update(Some(new_time))
      triggerContext.registerProcessingTimeTimer(new_time)
      TriggerResult.CONTINUE
    }
  }

  override def onProcessingTime(l: Long, w: Window, triggerContext: 
TriggerContext): TriggerResult = {
    println("proc time trigger")
    TriggerResult.FIRE_AND_PURGE
  }

  override def onEventTime(l: Long, w: Window, triggerContext: TriggerContext): 
TriggerResult = {
    println("even time trigger")
    TriggerResult.CONTINUE
  }
}



________________________________
Van: Manu Zhang [owenzhang1...@gmail.com]
Verzonden: vrijdag 21 oktober 2016 10:52
Aan: user@flink.apache.org
Onderwerp: Re: Session windows - Evaluation of addition of element + window 
expires/gets discarded after some set time of inactivity

Hi Anchit,

I think you need a customized 
EventTimeTrigger<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java#L43>
 which returns "TriggerResult.FIRE" both on new element and watermark.

Thanks,
Manu Zhang

On Fri, Oct 21, 2016 at 3:08 PM Anchit Jatana 
<development.anc...@gmail.com<mailto:development.anc...@gmail.com>> wrote:
Hi All,

I have a use case where in I'm supposed to work with Session Windows to 
maintain some values for some sessionIDs/keys.

The use case is as follows:

I need to maintain a session window for the incoming data and discard the 
window after some set gap/period of inactivity but what I want is that as soon 
as new element gets added to the window, all the records that are currently in 
the window get processed using the window transformation/function and the 
window does not get discarded.

The "Session windows implementation" as get processed only after the window is 
consider complete(based on some gap time settings). But I wish to process the 
all the elements contained in the window as soon as a new element gets added to 
the window(means addition of a new element triggers the processing of all 
elements of the window) but the discarding of the window happens only if there 
is a gap/inactivity for some set time. And when the window gets 
discarded/expires I don't want it to be re-evaluated, since it's contents were 
processed when the last element was added to the window.

Is this implementation possible? If yes, can someone please share some sample 
code to explain the implementation.

Thank you!

Regards,
Anchit

Reply via email to