Hi Padarn, Flink does not generate watermarks per keys. Atm watermarks are always global. Therefore, I would suspect that it is rather a problem with generating watermarks at all. Could it be that your input data does not span a period longer than 5 minutes and also does not terminate? Another problem could be the CountTrigger which should not react to the window's end time. The method onEventTime simply returns TriggerResult.CONTINUE and I think this will cause the window to not fire. Maybe a working example program with example input could be helpful for further debugging.
Cheers, Till On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote: > Hi Flink Mailing List, > > Long story short - I want to somehow collapse watermarks at an operator > across keys, so that keys with dragging watermarks do not drag behind. > Details below: > > --- > > I have an application in which I want to perform the follow sequence of > steps: Assume my data is made up of data that has: (time, user, location, > action) > > -> Read source > -> KeyBy (UserId, Location) > -> EventTimeSessionWindow (5 min gap) - results in (User Location Session) > -> TriggerOnFirst event > -> KeyBy (Location) > -> SlidingEventTimeWindow(5min length, 5 second gap) > -> Count > > The end intention is to count the number of unique users in a given > location - the EventTimeSessionWindow is used to make sure users are only > counted once. > > So I created a custom Trigger, which is the same as CountTrigger, but has > the following `TriggerResult" funtion: > > @Override > public TriggerResult onElement(Object element, long timestamp, W window, > TriggerContext ctx) throws Exception { > ReducingState<Long> count = ctx.getPartitionedState(stateDesc); > count.add(1L); > if (count.get() == maxCount) { > return TriggerResult.FIRE_AND_PURGE; > } else if (count.get() > maxCount) { > return TriggerResult.PURGE; > } > return TriggerResult.CONTINUE; > > } > > But my final SlidingEventTimeWindow does not fire properly. This is > because (I assume) there are some users with sessions windows that are not > closed, and so the watermark for those keys is running behind and so the > SlidingEventTimeWindow watermark is held back too. > > What I feel like I want to achieve is essentially setting the watermark of > the SlidingEventTimeWindow operator to be the maximum (with lateness) of > the input keys, rather than the minimum, but I cannot tell if this is > possible, and if not, what another approach could be. > > Thanks, > Padarn >