A little more info.  Here is a simplified version of my
trigger: (windowConfiguration.timespan is the duration of the window)

class CustomTrigger extends Trigger[QualifiedEvent, Window] {

  val stateTimeDescr = new ValueStateDescriptor[Long]("relevantTimestamp",
classOf[Long], 0)

  override def onElement(event: QualifiedEvent, timestamp: Long, W: Window,
ctx: TriggerContext): TriggerResult = {

    val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr)
    val windowConfigurationState =
ctx.getPartitionedState(windowConfigDescr)
    var windowConfiguration = windowConfigurationState.value()
    if(windowConfiguration == null) {
      windowConfigurationState.update(event.alertConfiguration.window.get)
      windowConfiguration = event.alertConfiguration.window.get
    }

    if(relevantTimestamp.value() == 0) {
      ctx.registerEventTimeTimer(event.event.created.toEpochMilli +
windowConfiguration.timespan.toMillis)
      relevantTimestamp.update (event.event.created.toEpochMilli +
windowConfiguration.timespan.toMillis)
    }

    TriggerResult.CONTINUE
  }

  override def onEventTime(timestamp: Long, W: Window, ctx:
TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(timestamp: Long, W: Window, ctx:
TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

And here is the actual window execution:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => {
e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => {
e.alertConfiguration.alertId.toString })
        .window(GlobalWindows.create)
        .trigger(ConfigurableTrigger.create)
        .apply(new GrouperFunction).name("Grouper Function")

Oddly enough when I do this with just a basic window function it works and
I only get the two events I am supposed to:

val stream = env.fromCollection(inputEvents)
        .assignAscendingTimestamps((e: QualifiedEvent) => {
e.event.created.toEpochMilli })
        .keyBy((e: QualifiedEvent) => {
e.alertConfiguration.alertId.toString })
        .timeWindow(Time.minutes(5))
        .apply(new GrouperFunction).name("Grouper Function")


On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <jb.bc....@gmail.com> wrote:

> Hey guys,
>
> I am trying to use event time along with a custom window to capture a
> subset of events.  The problem I am running into is that it seems that
> event that generates the timestamp/watermark arrives in the window before
> the onEventTime() call is made that closes the window.  Example:
>
> Window is supposed to capture 5 minutes of events after first event arrives
> Event 1: timestamp 12:01 - registers event timer for 12:06
> Event 2: timestamp 12:03
> Event 3: timestamp 12:20 - fires and purges window
>
> I get all three events in the window, instead of just the two the are
> really within the 5 minute window.
>
> Is there someway to force the timestamp to arrive in the window before the
> event that generated it?
>
> Thanks!
>
> --
> *Jason Brelloch* | Product Developer
> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
> <http://www.bettercloud.com/>
> Subscribe to the BetterCloud Monitor
> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>  -
> Get IT delivered to your inbox
>



-- 
*Jason Brelloch* | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
<http://www.bettercloud.com/>
Subscribe to the BetterCloud Monitor
<https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
Get IT delivered to your inbox

Reply via email to