A little more info. Here is a simplified version of my trigger: (windowConfiguration.timespan is the duration of the window)
class CustomTrigger extends Trigger[QualifiedEvent, Window] { val stateTimeDescr = new ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0) override def onElement(event: QualifiedEvent, timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = { val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr) val windowConfigurationState = ctx.getPartitionedState(windowConfigDescr) var windowConfiguration = windowConfigurationState.value() if(windowConfiguration == null) { windowConfigurationState.update(event.alertConfiguration.window.get) windowConfiguration = event.alertConfiguration.window.get } if(relevantTimestamp.value() == 0) { ctx.registerEventTimeTimer(event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis) relevantTimestamp.update (event.event.created.toEpochMilli + windowConfiguration.timespan.toMillis) } TriggerResult.CONTINUE } override def onEventTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = { TriggerResult.FIRE_AND_PURGE } override def onProcessingTime(timestamp: Long, W: Window, ctx: TriggerContext): TriggerResult = { TriggerResult.CONTINUE } } And here is the actual window execution: val stream = env.fromCollection(inputEvents) .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli }) .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString }) .window(GlobalWindows.create) .trigger(ConfigurableTrigger.create) .apply(new GrouperFunction).name("Grouper Function") Oddly enough when I do this with just a basic window function it works and I only get the two events I am supposed to: val stream = env.fromCollection(inputEvents) .assignAscendingTimestamps((e: QualifiedEvent) => { e.event.created.toEpochMilli }) .keyBy((e: QualifiedEvent) => { e.alertConfiguration.alertId.toString }) .timeWindow(Time.minutes(5)) .apply(new GrouperFunction).name("Grouper Function") On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <jb.bc....@gmail.com> wrote: > Hey guys, > > I am trying to use event time along with a custom window to capture a > subset of events. The problem I am running into is that it seems that > event that generates the timestamp/watermark arrives in the window before > the onEventTime() call is made that closes the window. Example: > > Window is supposed to capture 5 minutes of events after first event arrives > Event 1: timestamp 12:01 - registers event timer for 12:06 > Event 2: timestamp 12:03 > Event 3: timestamp 12:20 - fires and purges window > > I get all three events in the window, instead of just the two the are > really within the 5 minute window. > > Is there someway to force the timestamp to arrive in the window before the > event that generated it? > > Thanks! > > -- > *Jason Brelloch* | Product Developer > 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 > <http://www.bettercloud.com/> > Subscribe to the BetterCloud Monitor > <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> > - > Get IT delivered to your inbox > -- *Jason Brelloch* | Product Developer 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 <http://www.bettercloud.com/> Subscribe to the BetterCloud Monitor <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> - Get IT delivered to your inbox