Hi Till, I will work on an example, but I’m a little confused by how keyBy and watermarks work in this case. This documentation says ( https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams ):
Some operators consume multiple input streams; a union, for example, or operators following a *keyBy(…)*or *partition(…)* function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator. This implies to me that the keyBy splits the watermark? On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Padarn, > > Flink does not generate watermarks per keys. Atm watermarks are always > global. Therefore, I would suspect that it is rather a problem with > generating watermarks at all. Could it be that your input data does not > span a period longer than 5 minutes and also does not terminate? Another > problem could be the CountTrigger which should not react to the window's > end time. The method onEventTime simply returns TriggerResult.CONTINUE and > I think this will cause the window to not fire. Maybe a working example > program with example input could be helpful for further debugging. > > Cheers, > Till > > On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote: > >> Hi Flink Mailing List, >> >> Long story short - I want to somehow collapse watermarks at an operator >> across keys, so that keys with dragging watermarks do not drag behind. >> Details below: >> >> --- >> >> I have an application in which I want to perform the follow sequence of >> steps: Assume my data is made up of data that has: (time, user, location, >> action) >> >> -> Read source >> -> KeyBy (UserId, Location) >> -> EventTimeSessionWindow (5 min gap) - results in (User Location Session) >> -> TriggerOnFirst event >> -> KeyBy (Location) >> -> SlidingEventTimeWindow(5min length, 5 second gap) >> -> Count >> >> The end intention is to count the number of unique users in a given >> location - the EventTimeSessionWindow is used to make sure users are only >> counted once. >> >> So I created a custom Trigger, which is the same as CountTrigger, but has >> the following `TriggerResult" funtion: >> >> @Override >> public TriggerResult onElement(Object element, long timestamp, W window, >> TriggerContext ctx) throws Exception { >> ReducingState<Long> count = ctx.getPartitionedState(stateDesc); >> count.add(1L); >> if (count.get() == maxCount) { >> return TriggerResult.FIRE_AND_PURGE; >> } else if (count.get() > maxCount) { >> return TriggerResult.PURGE; >> } >> return TriggerResult.CONTINUE; >> >> } >> >> But my final SlidingEventTimeWindow does not fire properly. This is >> because (I assume) there are some users with sessions windows that are not >> closed, and so the watermark for those keys is running behind and so the >> SlidingEventTimeWindow watermark is held back too. >> >> What I feel like I want to achieve is essentially setting the watermark >> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of >> the input keys, rather than the minimum, but I cannot tell if this is >> possible, and if not, what another approach could be. >> >> Thanks, >> Padarn >> >